• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Module providing various facilities to other parts of the package
3#
4# multiprocessing/util.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# Licensed to PSF under a Contributor Agreement.
8#
9
10import os
11import itertools
12import sys
13import weakref
14import atexit
15import threading        # we want threading to install it's
16                        # cleanup function before multiprocessing does
17from subprocess import _args_from_interpreter_flags
18
19from . import process
20
21__all__ = [
22    'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
23    'log_to_stderr', 'get_temp_dir', 'register_after_fork',
24    'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
25    'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
26    ]
27
28#
29# Logging
30#
31
32NOTSET = 0
33SUBDEBUG = 5
34DEBUG = 10
35INFO = 20
36SUBWARNING = 25
37
38LOGGER_NAME = 'multiprocessing'
39DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
40
41_logger = None
42_log_to_stderr = False
43
44def sub_debug(msg, *args):
45    if _logger:
46        _logger.log(SUBDEBUG, msg, *args)
47
48def debug(msg, *args):
49    if _logger:
50        _logger.log(DEBUG, msg, *args)
51
52def info(msg, *args):
53    if _logger:
54        _logger.log(INFO, msg, *args)
55
56def sub_warning(msg, *args):
57    if _logger:
58        _logger.log(SUBWARNING, msg, *args)
59
60def get_logger():
61    '''
62    Returns logger used by multiprocessing
63    '''
64    global _logger
65    import logging
66
67    logging._acquireLock()
68    try:
69        if not _logger:
70
71            _logger = logging.getLogger(LOGGER_NAME)
72            _logger.propagate = 0
73
74            # XXX multiprocessing should cleanup before logging
75            if hasattr(atexit, 'unregister'):
76                atexit.unregister(_exit_function)
77                atexit.register(_exit_function)
78            else:
79                atexit._exithandlers.remove((_exit_function, (), {}))
80                atexit._exithandlers.append((_exit_function, (), {}))
81
82    finally:
83        logging._releaseLock()
84
85    return _logger
86
87def log_to_stderr(level=None):
88    '''
89    Turn on logging and add a handler which prints to stderr
90    '''
91    global _log_to_stderr
92    import logging
93
94    logger = get_logger()
95    formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
96    handler = logging.StreamHandler()
97    handler.setFormatter(formatter)
98    logger.addHandler(handler)
99
100    if level:
101        logger.setLevel(level)
102    _log_to_stderr = True
103    return _logger
104
105#
106# Function returning a temp directory which will be removed on exit
107#
108
109def get_temp_dir():
110    # get name of a temp directory which will be automatically cleaned up
111    tempdir = process.current_process()._config.get('tempdir')
112    if tempdir is None:
113        import shutil, tempfile
114        tempdir = tempfile.mkdtemp(prefix='pymp-')
115        info('created temp directory %s', tempdir)
116        Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
117        process.current_process()._config['tempdir'] = tempdir
118    return tempdir
119
120#
121# Support for reinitialization of objects when bootstrapping a child process
122#
123
124_afterfork_registry = weakref.WeakValueDictionary()
125_afterfork_counter = itertools.count()
126
127def _run_after_forkers():
128    items = list(_afterfork_registry.items())
129    items.sort()
130    for (index, ident, func), obj in items:
131        try:
132            func(obj)
133        except Exception as e:
134            info('after forker raised exception %s', e)
135
136def register_after_fork(obj, func):
137    _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj
138
139#
140# Finalization using weakrefs
141#
142
143_finalizer_registry = {}
144_finalizer_counter = itertools.count()
145
146
147class Finalize(object):
148    '''
149    Class which supports object finalization using weakrefs
150    '''
151    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
152        assert exitpriority is None or type(exitpriority) is int
153
154        if obj is not None:
155            self._weakref = weakref.ref(obj, self)
156        else:
157            assert exitpriority is not None
158
159        self._callback = callback
160        self._args = args
161        self._kwargs = kwargs or {}
162        self._key = (exitpriority, next(_finalizer_counter))
163        self._pid = os.getpid()
164
165        _finalizer_registry[self._key] = self
166
167    def __call__(self, wr=None,
168                 # Need to bind these locally because the globals can have
169                 # been cleared at shutdown
170                 _finalizer_registry=_finalizer_registry,
171                 sub_debug=sub_debug, getpid=os.getpid):
172        '''
173        Run the callback unless it has already been called or cancelled
174        '''
175        try:
176            del _finalizer_registry[self._key]
177        except KeyError:
178            sub_debug('finalizer no longer registered')
179        else:
180            if self._pid != getpid():
181                sub_debug('finalizer ignored because different process')
182                res = None
183            else:
184                sub_debug('finalizer calling %s with args %s and kwargs %s',
185                          self._callback, self._args, self._kwargs)
186                res = self._callback(*self._args, **self._kwargs)
187            self._weakref = self._callback = self._args = \
188                            self._kwargs = self._key = None
189            return res
190
191    def cancel(self):
192        '''
193        Cancel finalization of the object
194        '''
195        try:
196            del _finalizer_registry[self._key]
197        except KeyError:
198            pass
199        else:
200            self._weakref = self._callback = self._args = \
201                            self._kwargs = self._key = None
202
203    def still_active(self):
204        '''
205        Return whether this finalizer is still waiting to invoke callback
206        '''
207        return self._key in _finalizer_registry
208
209    def __repr__(self):
210        try:
211            obj = self._weakref()
212        except (AttributeError, TypeError):
213            obj = None
214
215        if obj is None:
216            return '<%s object, dead>' % self.__class__.__name__
217
218        x = '<%s object, callback=%s' % (
219                self.__class__.__name__,
220                getattr(self._callback, '__name__', self._callback))
221        if self._args:
222            x += ', args=' + str(self._args)
223        if self._kwargs:
224            x += ', kwargs=' + str(self._kwargs)
225        if self._key[0] is not None:
226            x += ', exitprority=' + str(self._key[0])
227        return x + '>'
228
229
230def _run_finalizers(minpriority=None):
231    '''
232    Run all finalizers whose exit priority is not None and at least minpriority
233
234    Finalizers with highest priority are called first; finalizers with
235    the same priority will be called in reverse order of creation.
236    '''
237    if _finalizer_registry is None:
238        # This function may be called after this module's globals are
239        # destroyed.  See the _exit_function function in this module for more
240        # notes.
241        return
242
243    if minpriority is None:
244        f = lambda p : p[0][0] is not None
245    else:
246        f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
247
248    items = [x for x in list(_finalizer_registry.items()) if f(x)]
249    items.sort(reverse=True)
250
251    for key, finalizer in items:
252        sub_debug('calling %s', finalizer)
253        try:
254            finalizer()
255        except Exception:
256            import traceback
257            traceback.print_exc()
258
259    if minpriority is None:
260        _finalizer_registry.clear()
261
262#
263# Clean up on exit
264#
265
266def is_exiting():
267    '''
268    Returns true if the process is shutting down
269    '''
270    return _exiting or _exiting is None
271
272_exiting = False
273
274def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
275                   active_children=process.active_children,
276                   current_process=process.current_process):
277    # We hold on to references to functions in the arglist due to the
278    # situation described below, where this function is called after this
279    # module's globals are destroyed.
280
281    global _exiting
282
283    if not _exiting:
284        _exiting = True
285
286        info('process shutting down')
287        debug('running all "atexit" finalizers with priority >= 0')
288        _run_finalizers(0)
289
290        if current_process() is not None:
291            # We check if the current process is None here because if
292            # it's None, any call to ``active_children()`` will raise
293            # an AttributeError (active_children winds up trying to
294            # get attributes from util._current_process).  One
295            # situation where this can happen is if someone has
296            # manipulated sys.modules, causing this module to be
297            # garbage collected.  The destructor for the module type
298            # then replaces all values in the module dict with None.
299            # For instance, after setuptools runs a test it replaces
300            # sys.modules with a copy created earlier.  See issues
301            # #9775 and #15881.  Also related: #4106, #9205, and
302            # #9207.
303
304            for p in active_children():
305                if p.daemon:
306                    info('calling terminate() for daemon %s', p.name)
307                    p._popen.terminate()
308
309            for p in active_children():
310                info('calling join() for process %s', p.name)
311                p.join()
312
313        debug('running the remaining "atexit" finalizers')
314        _run_finalizers()
315
316atexit.register(_exit_function)
317
318#
319# Some fork aware types
320#
321
322class ForkAwareThreadLock(object):
323    def __init__(self):
324        self._reset()
325        register_after_fork(self, ForkAwareThreadLock._reset)
326
327    def _reset(self):
328        self._lock = threading.Lock()
329        self.acquire = self._lock.acquire
330        self.release = self._lock.release
331
332    def __enter__(self):
333        return self._lock.__enter__()
334
335    def __exit__(self, *args):
336        return self._lock.__exit__(*args)
337
338
339class ForkAwareLocal(threading.local):
340    def __init__(self):
341        register_after_fork(self, lambda obj : obj.__dict__.clear())
342    def __reduce__(self):
343        return type(self), ()
344
345#
346# Close fds except those specified
347#
348
349try:
350    MAXFD = os.sysconf("SC_OPEN_MAX")
351except Exception:
352    MAXFD = 256
353
354def close_all_fds_except(fds):
355    fds = list(fds) + [-1, MAXFD]
356    fds.sort()
357    assert fds[-1] == MAXFD, 'fd too large'
358    for i in range(len(fds) - 1):
359        os.closerange(fds[i]+1, fds[i+1])
360#
361# Close sys.stdin and replace stdin with os.devnull
362#
363
364def _close_stdin():
365    if sys.stdin is None:
366        return
367
368    try:
369        sys.stdin.close()
370    except (OSError, ValueError):
371        pass
372
373    try:
374        fd = os.open(os.devnull, os.O_RDONLY)
375        try:
376            sys.stdin = open(fd, closefd=False)
377        except:
378            os.close(fd)
379            raise
380    except (OSError, ValueError):
381        pass
382
383#
384# Start a program with only specified fds kept open
385#
386
387def spawnv_passfds(path, args, passfds):
388    import _posixsubprocess
389    passfds = sorted(passfds)
390    errpipe_read, errpipe_write = os.pipe()
391    try:
392        return _posixsubprocess.fork_exec(
393            args, [os.fsencode(path)], True, passfds, None, None,
394            -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
395            False, False, None)
396    finally:
397        os.close(errpipe_read)
398        os.close(errpipe_write)
399