• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Module providing the `Process` class which emulates `threading.Thread`
3#
4# multiprocessing/process.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# Licensed to PSF under a Contributor Agreement.
8#
9
10__all__ = ['BaseProcess', 'current_process', 'active_children',
11           'parent_process']
12
13#
14# Imports
15#
16
17import os
18import sys
19import signal
20import itertools
21import threading
22from _weakrefset import WeakSet
23
24#
25#
26#
27
28try:
29    ORIGINAL_DIR = os.path.abspath(os.getcwd())
30except OSError:
31    ORIGINAL_DIR = None
32
33#
34# Public functions
35#
36
37def current_process():
38    '''
39    Return process object representing the current process
40    '''
41    return _current_process
42
43def active_children():
44    '''
45    Return list of process objects corresponding to live child processes
46    '''
47    _cleanup()
48    return list(_children)
49
50
51def parent_process():
52    '''
53    Return process object representing the parent process
54    '''
55    return _parent_process
56
57#
58#
59#
60
61def _cleanup():
62    # check for processes which have finished
63    for p in list(_children):
64        if (child_popen := p._popen) and child_popen.poll() is not None:
65            _children.discard(p)
66
67#
68# The `Process` class
69#
70
71class BaseProcess(object):
72    '''
73    Process objects represent activity that is run in a separate process
74
75    The class is analogous to `threading.Thread`
76    '''
77    def _Popen(self):
78        raise NotImplementedError
79
80    def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
81                 *, daemon=None):
82        assert group is None, 'group argument must be None for now'
83        count = next(_process_counter)
84        self._identity = _current_process._identity + (count,)
85        self._config = _current_process._config.copy()
86        self._parent_pid = os.getpid()
87        self._parent_name = _current_process.name
88        self._popen = None
89        self._closed = False
90        self._target = target
91        self._args = tuple(args)
92        self._kwargs = dict(kwargs)
93        self._name = name or type(self).__name__ + '-' + \
94                     ':'.join(str(i) for i in self._identity)
95        if daemon is not None:
96            self.daemon = daemon
97        _dangling.add(self)
98
99    def _check_closed(self):
100        if self._closed:
101            raise ValueError("process object is closed")
102
103    def run(self):
104        '''
105        Method to be run in sub-process; can be overridden in sub-class
106        '''
107        if self._target:
108            self._target(*self._args, **self._kwargs)
109
110    def start(self):
111        '''
112        Start child process
113        '''
114        self._check_closed()
115        assert self._popen is None, 'cannot start a process twice'
116        assert self._parent_pid == os.getpid(), \
117               'can only start a process object created by current process'
118        assert not _current_process._config.get('daemon'), \
119               'daemonic processes are not allowed to have children'
120        _cleanup()
121        self._popen = self._Popen(self)
122        self._sentinel = self._popen.sentinel
123        # Avoid a refcycle if the target function holds an indirect
124        # reference to the process object (see bpo-30775)
125        del self._target, self._args, self._kwargs
126        _children.add(self)
127
128    def terminate(self):
129        '''
130        Terminate process; sends SIGTERM signal or uses TerminateProcess()
131        '''
132        self._check_closed()
133        self._popen.terminate()
134
135    def kill(self):
136        '''
137        Terminate process; sends SIGKILL signal or uses TerminateProcess()
138        '''
139        self._check_closed()
140        self._popen.kill()
141
142    def join(self, timeout=None):
143        '''
144        Wait until child process terminates
145        '''
146        self._check_closed()
147        assert self._parent_pid == os.getpid(), 'can only join a child process'
148        assert self._popen is not None, 'can only join a started process'
149        res = self._popen.wait(timeout)
150        if res is not None:
151            _children.discard(self)
152
153    def is_alive(self):
154        '''
155        Return whether process is alive
156        '''
157        self._check_closed()
158        if self is _current_process:
159            return True
160        assert self._parent_pid == os.getpid(), 'can only test a child process'
161
162        if self._popen is None:
163            return False
164
165        returncode = self._popen.poll()
166        if returncode is None:
167            return True
168        else:
169            _children.discard(self)
170            return False
171
172    def close(self):
173        '''
174        Close the Process object.
175
176        This method releases resources held by the Process object.  It is
177        an error to call this method if the child process is still running.
178        '''
179        if self._popen is not None:
180            if self._popen.poll() is None:
181                raise ValueError("Cannot close a process while it is still running. "
182                                 "You should first call join() or terminate().")
183            self._popen.close()
184            self._popen = None
185            del self._sentinel
186            _children.discard(self)
187        self._closed = True
188
189    @property
190    def name(self):
191        return self._name
192
193    @name.setter
194    def name(self, name):
195        assert isinstance(name, str), 'name must be a string'
196        self._name = name
197
198    @property
199    def daemon(self):
200        '''
201        Return whether process is a daemon
202        '''
203        return self._config.get('daemon', False)
204
205    @daemon.setter
206    def daemon(self, daemonic):
207        '''
208        Set whether process is a daemon
209        '''
210        assert self._popen is None, 'process has already started'
211        self._config['daemon'] = daemonic
212
213    @property
214    def authkey(self):
215        return self._config['authkey']
216
217    @authkey.setter
218    def authkey(self, authkey):
219        '''
220        Set authorization key of process
221        '''
222        self._config['authkey'] = AuthenticationString(authkey)
223
224    @property
225    def exitcode(self):
226        '''
227        Return exit code of process or `None` if it has yet to stop
228        '''
229        self._check_closed()
230        if self._popen is None:
231            return self._popen
232        return self._popen.poll()
233
234    @property
235    def ident(self):
236        '''
237        Return identifier (PID) of process or `None` if it has yet to start
238        '''
239        self._check_closed()
240        if self is _current_process:
241            return os.getpid()
242        else:
243            return self._popen and self._popen.pid
244
245    pid = ident
246
247    @property
248    def sentinel(self):
249        '''
250        Return a file descriptor (Unix) or handle (Windows) suitable for
251        waiting for process termination.
252        '''
253        self._check_closed()
254        try:
255            return self._sentinel
256        except AttributeError:
257            raise ValueError("process not started") from None
258
259    def __repr__(self):
260        exitcode = None
261        if self is _current_process:
262            status = 'started'
263        elif self._closed:
264            status = 'closed'
265        elif self._parent_pid != os.getpid():
266            status = 'unknown'
267        elif self._popen is None:
268            status = 'initial'
269        else:
270            exitcode = self._popen.poll()
271            if exitcode is not None:
272                status = 'stopped'
273            else:
274                status = 'started'
275
276        info = [type(self).__name__, 'name=%r' % self._name]
277        if self._popen is not None:
278            info.append('pid=%s' % self._popen.pid)
279        info.append('parent=%s' % self._parent_pid)
280        info.append(status)
281        if exitcode is not None:
282            exitcode = _exitcode_to_name.get(exitcode, exitcode)
283            info.append('exitcode=%s' % exitcode)
284        if self.daemon:
285            info.append('daemon')
286        return '<%s>' % ' '.join(info)
287
288    ##
289
290    def _bootstrap(self, parent_sentinel=None):
291        from . import util, context
292        global _current_process, _parent_process, _process_counter, _children
293
294        try:
295            if self._start_method is not None:
296                context._force_start_method(self._start_method)
297            _process_counter = itertools.count(1)
298            _children = set()
299            util._close_stdin()
300            old_process = _current_process
301            _current_process = self
302            _parent_process = _ParentProcess(
303                self._parent_name, self._parent_pid, parent_sentinel)
304            if threading._HAVE_THREAD_NATIVE_ID:
305                threading.main_thread()._set_native_id()
306            try:
307                self._after_fork()
308            finally:
309                # delay finalization of the old process object until after
310                # _run_after_forkers() is executed
311                del old_process
312            util.info('child process calling self.run()')
313            self.run()
314            exitcode = 0
315        except SystemExit as e:
316            if e.code is None:
317                exitcode = 0
318            elif isinstance(e.code, int):
319                exitcode = e.code
320            else:
321                sys.stderr.write(str(e.code) + '\n')
322                exitcode = 1
323        except:
324            exitcode = 1
325            import traceback
326            sys.stderr.write('Process %s:\n' % self.name)
327            traceback.print_exc()
328        finally:
329            threading._shutdown()
330            util.info('process exiting with exitcode %d' % exitcode)
331            util._flush_std_streams()
332
333        return exitcode
334
335    @staticmethod
336    def _after_fork():
337        from . import util
338        util._finalizer_registry.clear()
339        util._run_after_forkers()
340
341
342#
343# We subclass bytes to avoid accidental transmission of auth keys over network
344#
345
346class AuthenticationString(bytes):
347    def __reduce__(self):
348        from .context import get_spawning_popen
349        if get_spawning_popen() is None:
350            raise TypeError(
351                'Pickling an AuthenticationString object is '
352                'disallowed for security reasons'
353                )
354        return AuthenticationString, (bytes(self),)
355
356
357#
358# Create object representing the parent process
359#
360
361class _ParentProcess(BaseProcess):
362
363    def __init__(self, name, pid, sentinel):
364        self._identity = ()
365        self._name = name
366        self._pid = pid
367        self._parent_pid = None
368        self._popen = None
369        self._closed = False
370        self._sentinel = sentinel
371        self._config = {}
372
373    def is_alive(self):
374        from multiprocessing.connection import wait
375        return not wait([self._sentinel], timeout=0)
376
377    @property
378    def ident(self):
379        return self._pid
380
381    def join(self, timeout=None):
382        '''
383        Wait until parent process terminates
384        '''
385        from multiprocessing.connection import wait
386        wait([self._sentinel], timeout=timeout)
387
388    pid = ident
389
390#
391# Create object representing the main process
392#
393
394class _MainProcess(BaseProcess):
395
396    def __init__(self):
397        self._identity = ()
398        self._name = 'MainProcess'
399        self._parent_pid = None
400        self._popen = None
401        self._closed = False
402        self._config = {'authkey': AuthenticationString(os.urandom(32)),
403                        'semprefix': '/mp'}
404        # Note that some versions of FreeBSD only allow named
405        # semaphores to have names of up to 14 characters.  Therefore
406        # we choose a short prefix.
407        #
408        # On MacOSX in a sandbox it may be necessary to use a
409        # different prefix -- see #19478.
410        #
411        # Everything in self._config will be inherited by descendant
412        # processes.
413
414    def close(self):
415        pass
416
417
418_parent_process = None
419_current_process = _MainProcess()
420_process_counter = itertools.count(1)
421_children = set()
422del _MainProcess
423
424#
425# Give names to some return codes
426#
427
428_exitcode_to_name = {}
429
430for name, signum in list(signal.__dict__.items()):
431    if name[:3]=='SIG' and '_' not in name:
432        _exitcode_to_name[-signum] = f'-{name}'
433del name, signum
434
435# For debug and leak testing
436_dangling = WeakSet()
437