1# 2# Module providing various facilities to other parts of the package 3# 4# multiprocessing/util.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# All rights reserved. 8# 9# Redistribution and use in source and binary forms, with or without 10# modification, are permitted provided that the following conditions 11# are met: 12# 13# 1. Redistributions of source code must retain the above copyright 14# notice, this list of conditions and the following disclaimer. 15# 2. Redistributions in binary form must reproduce the above copyright 16# notice, this list of conditions and the following disclaimer in the 17# documentation and/or other materials provided with the distribution. 18# 3. Neither the name of author nor the names of any contributors may be 19# used to endorse or promote products derived from this software 20# without specific prior written permission. 21# 22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32# SUCH DAMAGE. 33# 34 35import os 36import itertools 37import weakref 38import atexit 39import threading # we want threading to install it's 40 # cleanup function before multiprocessing does 41from subprocess import _args_from_interpreter_flags 42 43from multiprocessing.process import current_process, active_children 44 45__all__ = [ 46 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', 47 'log_to_stderr', 'get_temp_dir', 'register_after_fork', 48 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal', 49 'SUBDEBUG', 'SUBWARNING', 50 ] 51 52# 53# Logging 54# 55 56NOTSET = 0 57SUBDEBUG = 5 58DEBUG = 10 59INFO = 20 60SUBWARNING = 25 61 62LOGGER_NAME = 'multiprocessing' 63DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' 64 65_logger = None 66_log_to_stderr = False 67 68def sub_debug(msg, *args): 69 if _logger: 70 _logger.log(SUBDEBUG, msg, *args) 71 72def debug(msg, *args): 73 if _logger: 74 _logger.log(DEBUG, msg, *args) 75 76def info(msg, *args): 77 if _logger: 78 _logger.log(INFO, msg, *args) 79 80def sub_warning(msg, *args): 81 if _logger: 82 _logger.log(SUBWARNING, msg, *args) 83 84def get_logger(): 85 ''' 86 Returns logger used by multiprocessing 87 ''' 88 global _logger 89 import logging, atexit 90 91 logging._acquireLock() 92 try: 93 if not _logger: 94 95 _logger = logging.getLogger(LOGGER_NAME) 96 _logger.propagate = 0 97 logging.addLevelName(SUBDEBUG, 'SUBDEBUG') 98 logging.addLevelName(SUBWARNING, 'SUBWARNING') 99 100 # XXX multiprocessing should cleanup before logging 101 if hasattr(atexit, 'unregister'): 102 atexit.unregister(_exit_function) 103 atexit.register(_exit_function) 104 else: 105 atexit._exithandlers.remove((_exit_function, (), {})) 106 atexit._exithandlers.append((_exit_function, (), {})) 107 108 finally: 109 logging._releaseLock() 110 111 return _logger 112 113def log_to_stderr(level=None): 114 ''' 115 Turn on logging and add a handler which prints to stderr 116 ''' 117 global _log_to_stderr 118 import logging 119 120 logger = get_logger() 121 formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) 122 handler = logging.StreamHandler() 123 handler.setFormatter(formatter) 124 logger.addHandler(handler) 125 126 if level: 127 logger.setLevel(level) 128 _log_to_stderr = True 129 return _logger 130 131# 132# Function returning a temp directory which will be removed on exit 133# 134 135def get_temp_dir(): 136 # get name of a temp directory which will be automatically cleaned up 137 if current_process()._tempdir is None: 138 import shutil, tempfile 139 tempdir = tempfile.mkdtemp(prefix='pymp-') 140 info('created temp directory %s', tempdir) 141 Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100) 142 current_process()._tempdir = tempdir 143 return current_process()._tempdir 144 145# 146# Support for reinitialization of objects when bootstrapping a child process 147# 148 149_afterfork_registry = weakref.WeakValueDictionary() 150_afterfork_counter = itertools.count() 151 152def _run_after_forkers(): 153 items = list(_afterfork_registry.items()) 154 items.sort() 155 for (index, ident, func), obj in items: 156 try: 157 func(obj) 158 except Exception, e: 159 info('after forker raised exception %s', e) 160 161def register_after_fork(obj, func): 162 _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj 163 164# 165# Finalization using weakrefs 166# 167 168_finalizer_registry = {} 169_finalizer_counter = itertools.count() 170 171 172class Finalize(object): 173 ''' 174 Class which supports object finalization using weakrefs 175 ''' 176 def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): 177 assert exitpriority is None or type(exitpriority) in (int, long) 178 179 if obj is not None: 180 self._weakref = weakref.ref(obj, self) 181 else: 182 assert exitpriority is not None 183 184 self._callback = callback 185 self._args = args 186 self._kwargs = kwargs or {} 187 self._key = (exitpriority, _finalizer_counter.next()) 188 self._pid = os.getpid() 189 190 _finalizer_registry[self._key] = self 191 192 def __call__(self, wr=None): 193 ''' 194 Run the callback unless it has already been called or cancelled 195 ''' 196 try: 197 del _finalizer_registry[self._key] 198 except KeyError: 199 sub_debug('finalizer no longer registered') 200 else: 201 if self._pid != os.getpid(): 202 sub_debug('finalizer ignored because different process') 203 res = None 204 else: 205 sub_debug('finalizer calling %s with args %s and kwargs %s', 206 self._callback, self._args, self._kwargs) 207 res = self._callback(*self._args, **self._kwargs) 208 self._weakref = self._callback = self._args = \ 209 self._kwargs = self._key = None 210 return res 211 212 def cancel(self): 213 ''' 214 Cancel finalization of the object 215 ''' 216 try: 217 del _finalizer_registry[self._key] 218 except KeyError: 219 pass 220 else: 221 self._weakref = self._callback = self._args = \ 222 self._kwargs = self._key = None 223 224 def still_active(self): 225 ''' 226 Return whether this finalizer is still waiting to invoke callback 227 ''' 228 return self._key in _finalizer_registry 229 230 def __repr__(self): 231 try: 232 obj = self._weakref() 233 except (AttributeError, TypeError): 234 obj = None 235 236 if obj is None: 237 return '<Finalize object, dead>' 238 239 x = '<Finalize object, callback=%s' % \ 240 getattr(self._callback, '__name__', self._callback) 241 if self._args: 242 x += ', args=' + str(self._args) 243 if self._kwargs: 244 x += ', kwargs=' + str(self._kwargs) 245 if self._key[0] is not None: 246 x += ', exitprority=' + str(self._key[0]) 247 return x + '>' 248 249 250def _run_finalizers(minpriority=None): 251 ''' 252 Run all finalizers whose exit priority is not None and at least minpriority 253 254 Finalizers with highest priority are called first; finalizers with 255 the same priority will be called in reverse order of creation. 256 ''' 257 if _finalizer_registry is None: 258 # This function may be called after this module's globals are 259 # destroyed. See the _exit_function function in this module for more 260 # notes. 261 return 262 263 if minpriority is None: 264 f = lambda p : p[0][0] is not None 265 else: 266 f = lambda p : p[0][0] is not None and p[0][0] >= minpriority 267 268 # Careful: _finalizer_registry may be mutated while this function 269 # is running (either by a GC run or by another thread). 270 271 items = [x for x in _finalizer_registry.items() if f(x)] 272 items.sort(reverse=True) 273 274 for key, finalizer in items: 275 sub_debug('calling %s', finalizer) 276 try: 277 finalizer() 278 except Exception: 279 import traceback 280 traceback.print_exc() 281 282 if minpriority is None: 283 _finalizer_registry.clear() 284 285# 286# Clean up on exit 287# 288 289def is_exiting(): 290 ''' 291 Returns true if the process is shutting down 292 ''' 293 return _exiting or _exiting is None 294 295_exiting = False 296 297def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, 298 active_children=active_children, 299 current_process=current_process): 300 # NB: we hold on to references to functions in the arglist due to the 301 # situation described below, where this function is called after this 302 # module's globals are destroyed. 303 304 global _exiting 305 306 info('process shutting down') 307 debug('running all "atexit" finalizers with priority >= 0') 308 _run_finalizers(0) 309 310 if current_process() is not None: 311 # NB: we check if the current process is None here because if 312 # it's None, any call to ``active_children()`` will throw an 313 # AttributeError (active_children winds up trying to get 314 # attributes from util._current_process). This happens in a 315 # variety of shutdown circumstances that are not well-understood 316 # because module-scope variables are not apparently supposed to 317 # be destroyed until after this function is called. However, 318 # they are indeed destroyed before this function is called. See 319 # issues 9775 and 15881. Also related: 4106, 9205, and 9207. 320 321 for p in active_children(): 322 if p._daemonic: 323 info('calling terminate() for daemon %s', p.name) 324 p._popen.terminate() 325 326 for p in active_children(): 327 info('calling join() for process %s', p.name) 328 p.join() 329 330 debug('running the remaining "atexit" finalizers') 331 _run_finalizers() 332 333atexit.register(_exit_function) 334 335# 336# Some fork aware types 337# 338 339class ForkAwareThreadLock(object): 340 def __init__(self): 341 self._reset() 342 register_after_fork(self, ForkAwareThreadLock._reset) 343 344 def _reset(self): 345 self._lock = threading.Lock() 346 self.acquire = self._lock.acquire 347 self.release = self._lock.release 348 349class ForkAwareLocal(threading.local): 350 def __init__(self): 351 register_after_fork(self, lambda obj : obj.__dict__.clear()) 352 def __reduce__(self): 353 return type(self), () 354