• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import sys
3import threading
4
5from . import process
6from . import reduction
7
8__all__ = ()
9
10#
11# Exceptions
12#
13
14class ProcessError(Exception):
15    pass
16
17class BufferTooShort(ProcessError):
18    pass
19
20class TimeoutError(ProcessError):
21    pass
22
23class AuthenticationError(ProcessError):
24    pass
25
26#
27# Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py
28#
29
30class BaseContext(object):
31
32    ProcessError = ProcessError
33    BufferTooShort = BufferTooShort
34    TimeoutError = TimeoutError
35    AuthenticationError = AuthenticationError
36
37    current_process = staticmethod(process.current_process)
38    parent_process = staticmethod(process.parent_process)
39    active_children = staticmethod(process.active_children)
40
41    def cpu_count(self):
42        '''Returns the number of CPUs in the system'''
43        num = os.cpu_count()
44        if num is None:
45            raise NotImplementedError('cannot determine number of cpus')
46        else:
47            return num
48
49    def Manager(self):
50        '''Returns a manager associated with a running server process
51
52        The managers methods such as `Lock()`, `Condition()` and `Queue()`
53        can be used to create shared objects.
54        '''
55        from .managers import SyncManager
56        m = SyncManager(ctx=self.get_context())
57        m.start()
58        return m
59
60    def Pipe(self, duplex=True):
61        '''Returns two connection object connected by a pipe'''
62        from .connection import Pipe
63        return Pipe(duplex)
64
65    def Lock(self):
66        '''Returns a non-recursive lock object'''
67        from .synchronize import Lock
68        return Lock(ctx=self.get_context())
69
70    def RLock(self):
71        '''Returns a recursive lock object'''
72        from .synchronize import RLock
73        return RLock(ctx=self.get_context())
74
75    def Condition(self, lock=None):
76        '''Returns a condition object'''
77        from .synchronize import Condition
78        return Condition(lock, ctx=self.get_context())
79
80    def Semaphore(self, value=1):
81        '''Returns a semaphore object'''
82        from .synchronize import Semaphore
83        return Semaphore(value, ctx=self.get_context())
84
85    def BoundedSemaphore(self, value=1):
86        '''Returns a bounded semaphore object'''
87        from .synchronize import BoundedSemaphore
88        return BoundedSemaphore(value, ctx=self.get_context())
89
90    def Event(self):
91        '''Returns an event object'''
92        from .synchronize import Event
93        return Event(ctx=self.get_context())
94
95    def Barrier(self, parties, action=None, timeout=None):
96        '''Returns a barrier object'''
97        from .synchronize import Barrier
98        return Barrier(parties, action, timeout, ctx=self.get_context())
99
100    def Queue(self, maxsize=0):
101        '''Returns a queue object'''
102        from .queues import Queue
103        return Queue(maxsize, ctx=self.get_context())
104
105    def JoinableQueue(self, maxsize=0):
106        '''Returns a queue object'''
107        from .queues import JoinableQueue
108        return JoinableQueue(maxsize, ctx=self.get_context())
109
110    def SimpleQueue(self):
111        '''Returns a queue object'''
112        from .queues import SimpleQueue
113        return SimpleQueue(ctx=self.get_context())
114
115    def Pool(self, processes=None, initializer=None, initargs=(),
116             maxtasksperchild=None):
117        '''Returns a process pool object'''
118        from .pool import Pool
119        return Pool(processes, initializer, initargs, maxtasksperchild,
120                    context=self.get_context())
121
122    def RawValue(self, typecode_or_type, *args):
123        '''Returns a shared object'''
124        from .sharedctypes import RawValue
125        return RawValue(typecode_or_type, *args)
126
127    def RawArray(self, typecode_or_type, size_or_initializer):
128        '''Returns a shared array'''
129        from .sharedctypes import RawArray
130        return RawArray(typecode_or_type, size_or_initializer)
131
132    def Value(self, typecode_or_type, *args, lock=True):
133        '''Returns a synchronized shared object'''
134        from .sharedctypes import Value
135        return Value(typecode_or_type, *args, lock=lock,
136                     ctx=self.get_context())
137
138    def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
139        '''Returns a synchronized shared array'''
140        from .sharedctypes import Array
141        return Array(typecode_or_type, size_or_initializer, lock=lock,
142                     ctx=self.get_context())
143
144    def freeze_support(self):
145        '''Check whether this is a fake forked process in a frozen executable.
146        If so then run code specified by commandline and exit.
147        '''
148        if sys.platform == 'win32' and getattr(sys, 'frozen', False):
149            from .spawn import freeze_support
150            freeze_support()
151
152    def get_logger(self):
153        '''Return package logger -- if it does not already exist then
154        it is created.
155        '''
156        from .util import get_logger
157        return get_logger()
158
159    def log_to_stderr(self, level=None):
160        '''Turn on logging and add a handler which prints to stderr'''
161        from .util import log_to_stderr
162        return log_to_stderr(level)
163
164    def allow_connection_pickling(self):
165        '''Install support for sending connections and sockets
166        between processes
167        '''
168        # This is undocumented.  In previous versions of multiprocessing
169        # its only effect was to make socket objects inheritable on Windows.
170        from . import connection
171
172    def set_executable(self, executable):
173        '''Sets the path to a python.exe or pythonw.exe binary used to run
174        child processes instead of sys.executable when using the 'spawn'
175        start method.  Useful for people embedding Python.
176        '''
177        from .spawn import set_executable
178        set_executable(executable)
179
180    def set_forkserver_preload(self, module_names):
181        '''Set list of module names to try to load in forkserver process.
182        This is really just a hint.
183        '''
184        from .forkserver import set_forkserver_preload
185        set_forkserver_preload(module_names)
186
187    def get_context(self, method=None):
188        if method is None:
189            return self
190        try:
191            ctx = _concrete_contexts[method]
192        except KeyError:
193            raise ValueError('cannot find context for %r' % method) from None
194        ctx._check_available()
195        return ctx
196
197    def get_start_method(self, allow_none=False):
198        return self._name
199
200    def set_start_method(self, method, force=False):
201        raise ValueError('cannot set start method of concrete context')
202
203    @property
204    def reducer(self):
205        '''Controls how objects will be reduced to a form that can be
206        shared with other processes.'''
207        return globals().get('reduction')
208
209    @reducer.setter
210    def reducer(self, reduction):
211        globals()['reduction'] = reduction
212
213    def _check_available(self):
214        pass
215
216#
217# Type of default context -- underlying context can be set at most once
218#
219
220class Process(process.BaseProcess):
221    _start_method = None
222    @staticmethod
223    def _Popen(process_obj):
224        return _default_context.get_context().Process._Popen(process_obj)
225
226class DefaultContext(BaseContext):
227    Process = Process
228
229    def __init__(self, context):
230        self._default_context = context
231        self._actual_context = None
232
233    def get_context(self, method=None):
234        if method is None:
235            if self._actual_context is None:
236                self._actual_context = self._default_context
237            return self._actual_context
238        else:
239            return super().get_context(method)
240
241    def set_start_method(self, method, force=False):
242        if self._actual_context is not None and not force:
243            raise RuntimeError('context has already been set')
244        if method is None and force:
245            self._actual_context = None
246            return
247        self._actual_context = self.get_context(method)
248
249    def get_start_method(self, allow_none=False):
250        if self._actual_context is None:
251            if allow_none:
252                return None
253            self._actual_context = self._default_context
254        return self._actual_context._name
255
256    def get_all_start_methods(self):
257        if sys.platform == 'win32':
258            return ['spawn']
259        else:
260            if reduction.HAVE_SEND_HANDLE:
261                return ['fork', 'spawn', 'forkserver']
262            else:
263                return ['fork', 'spawn']
264
265#
266# Context types for fixed start method
267#
268
269if sys.platform != 'win32':
270
271    class ForkProcess(process.BaseProcess):
272        _start_method = 'fork'
273        @staticmethod
274        def _Popen(process_obj):
275            from .popen_fork import Popen
276            return Popen(process_obj)
277
278    class SpawnProcess(process.BaseProcess):
279        _start_method = 'spawn'
280        @staticmethod
281        def _Popen(process_obj):
282            from .popen_spawn_posix import Popen
283            return Popen(process_obj)
284
285    class ForkServerProcess(process.BaseProcess):
286        _start_method = 'forkserver'
287        @staticmethod
288        def _Popen(process_obj):
289            from .popen_forkserver import Popen
290            return Popen(process_obj)
291
292    class ForkContext(BaseContext):
293        _name = 'fork'
294        Process = ForkProcess
295
296    class SpawnContext(BaseContext):
297        _name = 'spawn'
298        Process = SpawnProcess
299
300    class ForkServerContext(BaseContext):
301        _name = 'forkserver'
302        Process = ForkServerProcess
303        def _check_available(self):
304            if not reduction.HAVE_SEND_HANDLE:
305                raise ValueError('forkserver start method not available')
306
307    _concrete_contexts = {
308        'fork': ForkContext(),
309        'spawn': SpawnContext(),
310        'forkserver': ForkServerContext(),
311    }
312    if sys.platform == 'darwin':
313        # bpo-33725: running arbitrary code after fork() is no longer reliable
314        # on macOS since macOS 10.14 (Mojave). Use spawn by default instead.
315        _default_context = DefaultContext(_concrete_contexts['spawn'])
316    else:
317        _default_context = DefaultContext(_concrete_contexts['fork'])
318
319else:
320
321    class SpawnProcess(process.BaseProcess):
322        _start_method = 'spawn'
323        @staticmethod
324        def _Popen(process_obj):
325            from .popen_spawn_win32 import Popen
326            return Popen(process_obj)
327
328    class SpawnContext(BaseContext):
329        _name = 'spawn'
330        Process = SpawnProcess
331
332    _concrete_contexts = {
333        'spawn': SpawnContext(),
334    }
335    _default_context = DefaultContext(_concrete_contexts['spawn'])
336
337#
338# Force the start method
339#
340
341def _force_start_method(method):
342    _default_context._actual_context = _concrete_contexts[method]
343
344#
345# Check that the current thread is spawning a child process
346#
347
348_tls = threading.local()
349
350def get_spawning_popen():
351    return getattr(_tls, 'spawning_popen', None)
352
353def set_spawning_popen(popen):
354    _tls.spawning_popen = popen
355
356def assert_spawning(obj):
357    if get_spawning_popen() is None:
358        raise RuntimeError(
359            '%s objects should only be shared between processes'
360            ' through inheritance' % type(obj).__name__
361            )
362