• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Module providing the `Process` class which emulates `threading.Thread`
3#
4# multiprocessing/process.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# Licensed to PSF under a Contributor Agreement.
8#
9
10__all__ = ['BaseProcess', 'current_process', 'active_children',
11           'parent_process']
12
13#
14# Imports
15#
16
17import os
18import sys
19import signal
20import itertools
21import threading
22from _weakrefset import WeakSet
23
24#
25#
26#
27
28try:
29    ORIGINAL_DIR = os.path.abspath(os.getcwd())
30except OSError:
31    ORIGINAL_DIR = None
32
33#
34# Public functions
35#
36
37def current_process():
38    '''
39    Return process object representing the current process
40    '''
41    return _current_process
42
43def active_children():
44    '''
45    Return list of process objects corresponding to live child processes
46    '''
47    _cleanup()
48    return list(_children)
49
50
51def parent_process():
52    '''
53    Return process object representing the parent process
54    '''
55    return _parent_process
56
57#
58#
59#
60
61def _cleanup():
62    # check for processes which have finished
63    for p in list(_children):
64        if p._popen.poll() is not None:
65            _children.discard(p)
66
67#
68# The `Process` class
69#
70
71class BaseProcess(object):
72    '''
73    Process objects represent activity that is run in a separate process
74
75    The class is analogous to `threading.Thread`
76    '''
77    def _Popen(self):
78        raise NotImplementedError
79
80    def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
81                 *, daemon=None):
82        assert group is None, 'group argument must be None for now'
83        count = next(_process_counter)
84        self._identity = _current_process._identity + (count,)
85        self._config = _current_process._config.copy()
86        self._parent_pid = os.getpid()
87        self._parent_name = _current_process.name
88        self._popen = None
89        self._closed = False
90        self._target = target
91        self._args = tuple(args)
92        self._kwargs = dict(kwargs)
93        self._name = name or type(self).__name__ + '-' + \
94                     ':'.join(str(i) for i in self._identity)
95        if daemon is not None:
96            self.daemon = daemon
97        _dangling.add(self)
98
99    def _check_closed(self):
100        if self._closed:
101            raise ValueError("process object is closed")
102
103    def run(self):
104        '''
105        Method to be run in sub-process; can be overridden in sub-class
106        '''
107        if self._target:
108            self._target(*self._args, **self._kwargs)
109
110    def start(self):
111        '''
112        Start child process
113        '''
114        self._check_closed()
115        assert self._popen is None, 'cannot start a process twice'
116        assert self._parent_pid == os.getpid(), \
117               'can only start a process object created by current process'
118        assert not _current_process._config.get('daemon'), \
119               'daemonic processes are not allowed to have children'
120        _cleanup()
121        self._popen = self._Popen(self)
122        self._sentinel = self._popen.sentinel
123        # Avoid a refcycle if the target function holds an indirect
124        # reference to the process object (see bpo-30775)
125        del self._target, self._args, self._kwargs
126        _children.add(self)
127
128    def terminate(self):
129        '''
130        Terminate process; sends SIGTERM signal or uses TerminateProcess()
131        '''
132        self._check_closed()
133        self._popen.terminate()
134
135    def kill(self):
136        '''
137        Terminate process; sends SIGKILL signal or uses TerminateProcess()
138        '''
139        self._check_closed()
140        self._popen.kill()
141
142    def join(self, timeout=None):
143        '''
144        Wait until child process terminates
145        '''
146        self._check_closed()
147        assert self._parent_pid == os.getpid(), 'can only join a child process'
148        assert self._popen is not None, 'can only join a started process'
149        res = self._popen.wait(timeout)
150        if res is not None:
151            _children.discard(self)
152
153    def is_alive(self):
154        '''
155        Return whether process is alive
156        '''
157        self._check_closed()
158        if self is _current_process:
159            return True
160        assert self._parent_pid == os.getpid(), 'can only test a child process'
161
162        if self._popen is None:
163            return False
164
165        returncode = self._popen.poll()
166        if returncode is None:
167            return True
168        else:
169            _children.discard(self)
170            return False
171
172    def close(self):
173        '''
174        Close the Process object.
175
176        This method releases resources held by the Process object.  It is
177        an error to call this method if the child process is still running.
178        '''
179        if self._popen is not None:
180            if self._popen.poll() is None:
181                raise ValueError("Cannot close a process while it is still running. "
182                                 "You should first call join() or terminate().")
183            self._popen.close()
184            self._popen = None
185            del self._sentinel
186            _children.discard(self)
187        self._closed = True
188
189    @property
190    def name(self):
191        return self._name
192
193    @name.setter
194    def name(self, name):
195        assert isinstance(name, str), 'name must be a string'
196        self._name = name
197
198    @property
199    def daemon(self):
200        '''
201        Return whether process is a daemon
202        '''
203        return self._config.get('daemon', False)
204
205    @daemon.setter
206    def daemon(self, daemonic):
207        '''
208        Set whether process is a daemon
209        '''
210        assert self._popen is None, 'process has already started'
211        self._config['daemon'] = daemonic
212
213    @property
214    def authkey(self):
215        return self._config['authkey']
216
217    @authkey.setter
218    def authkey(self, authkey):
219        '''
220        Set authorization key of process
221        '''
222        self._config['authkey'] = AuthenticationString(authkey)
223
224    @property
225    def exitcode(self):
226        '''
227        Return exit code of process or `None` if it has yet to stop
228        '''
229        self._check_closed()
230        if self._popen is None:
231            return self._popen
232        return self._popen.poll()
233
234    @property
235    def ident(self):
236        '''
237        Return identifier (PID) of process or `None` if it has yet to start
238        '''
239        self._check_closed()
240        if self is _current_process:
241            return os.getpid()
242        else:
243            return self._popen and self._popen.pid
244
245    pid = ident
246
247    @property
248    def sentinel(self):
249        '''
250        Return a file descriptor (Unix) or handle (Windows) suitable for
251        waiting for process termination.
252        '''
253        self._check_closed()
254        try:
255            return self._sentinel
256        except AttributeError:
257            raise ValueError("process not started") from None
258
259    def __repr__(self):
260        exitcode = None
261        if self is _current_process:
262            status = 'started'
263        elif self._closed:
264            status = 'closed'
265        elif self._parent_pid != os.getpid():
266            status = 'unknown'
267        elif self._popen is None:
268            status = 'initial'
269        else:
270            exitcode = self._popen.poll()
271            if exitcode is not None:
272                status = 'stopped'
273            else:
274                status = 'started'
275
276        info = [type(self).__name__, 'name=%r' % self._name]
277        if self._popen is not None:
278            info.append('pid=%s' % self._popen.pid)
279        info.append('parent=%s' % self._parent_pid)
280        info.append(status)
281        if exitcode is not None:
282            exitcode = _exitcode_to_name.get(exitcode, exitcode)
283            info.append('exitcode=%s' % exitcode)
284        if self.daemon:
285            info.append('daemon')
286        return '<%s>' % ' '.join(info)
287
288    ##
289
290    def _bootstrap(self, parent_sentinel=None):
291        from . import util, context
292        global _current_process, _parent_process, _process_counter, _children
293
294        try:
295            if self._start_method is not None:
296                context._force_start_method(self._start_method)
297            _process_counter = itertools.count(1)
298            _children = set()
299            util._close_stdin()
300            old_process = _current_process
301            _current_process = self
302            _parent_process = _ParentProcess(
303                self._parent_name, self._parent_pid, parent_sentinel)
304            if threading._HAVE_THREAD_NATIVE_ID:
305                threading.main_thread()._set_native_id()
306            try:
307                util._finalizer_registry.clear()
308                util._run_after_forkers()
309            finally:
310                # delay finalization of the old process object until after
311                # _run_after_forkers() is executed
312                del old_process
313            util.info('child process calling self.run()')
314            try:
315                self.run()
316                exitcode = 0
317            finally:
318                util._exit_function()
319        except SystemExit as e:
320            if e.code is None:
321                exitcode = 0
322            elif isinstance(e.code, int):
323                exitcode = e.code
324            else:
325                sys.stderr.write(str(e.code) + '\n')
326                exitcode = 1
327        except:
328            exitcode = 1
329            import traceback
330            sys.stderr.write('Process %s:\n' % self.name)
331            traceback.print_exc()
332        finally:
333            threading._shutdown()
334            util.info('process exiting with exitcode %d' % exitcode)
335            util._flush_std_streams()
336
337        return exitcode
338
339#
340# We subclass bytes to avoid accidental transmission of auth keys over network
341#
342
343class AuthenticationString(bytes):
344    def __reduce__(self):
345        from .context import get_spawning_popen
346        if get_spawning_popen() is None:
347            raise TypeError(
348                'Pickling an AuthenticationString object is '
349                'disallowed for security reasons'
350                )
351        return AuthenticationString, (bytes(self),)
352
353
354#
355# Create object representing the parent process
356#
357
358class _ParentProcess(BaseProcess):
359
360    def __init__(self, name, pid, sentinel):
361        self._identity = ()
362        self._name = name
363        self._pid = pid
364        self._parent_pid = None
365        self._popen = None
366        self._closed = False
367        self._sentinel = sentinel
368        self._config = {}
369
370    def is_alive(self):
371        from multiprocessing.connection import wait
372        return not wait([self._sentinel], timeout=0)
373
374    @property
375    def ident(self):
376        return self._pid
377
378    def join(self, timeout=None):
379        '''
380        Wait until parent process terminates
381        '''
382        from multiprocessing.connection import wait
383        wait([self._sentinel], timeout=timeout)
384
385    pid = ident
386
387#
388# Create object representing the main process
389#
390
391class _MainProcess(BaseProcess):
392
393    def __init__(self):
394        self._identity = ()
395        self._name = 'MainProcess'
396        self._parent_pid = None
397        self._popen = None
398        self._closed = False
399        self._config = {'authkey': AuthenticationString(os.urandom(32)),
400                        'semprefix': '/mp'}
401        # Note that some versions of FreeBSD only allow named
402        # semaphores to have names of up to 14 characters.  Therefore
403        # we choose a short prefix.
404        #
405        # On MacOSX in a sandbox it may be necessary to use a
406        # different prefix -- see #19478.
407        #
408        # Everything in self._config will be inherited by descendant
409        # processes.
410
411    def close(self):
412        pass
413
414
415_parent_process = None
416_current_process = _MainProcess()
417_process_counter = itertools.count(1)
418_children = set()
419del _MainProcess
420
421#
422# Give names to some return codes
423#
424
425_exitcode_to_name = {}
426
427for name, signum in list(signal.__dict__.items()):
428    if name[:3]=='SIG' and '_' not in name:
429        _exitcode_to_name[-signum] = f'-{name}'
430
431# For debug and leak testing
432_dangling = WeakSet()
433