1# 2# Module providing various facilities to other parts of the package 3# 4# multiprocessing/util.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10import os 11import itertools 12import sys 13import weakref 14import atexit 15import threading # we want threading to install it's 16 # cleanup function before multiprocessing does 17from subprocess import _args_from_interpreter_flags 18 19from . import process 20 21__all__ = [ 22 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', 23 'log_to_stderr', 'get_temp_dir', 'register_after_fork', 24 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal', 25 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING', 26 ] 27 28# 29# Logging 30# 31 32NOTSET = 0 33SUBDEBUG = 5 34DEBUG = 10 35INFO = 20 36SUBWARNING = 25 37 38LOGGER_NAME = 'multiprocessing' 39DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' 40 41_logger = None 42_log_to_stderr = False 43 44def sub_debug(msg, *args): 45 if _logger: 46 _logger.log(SUBDEBUG, msg, *args) 47 48def debug(msg, *args): 49 if _logger: 50 _logger.log(DEBUG, msg, *args) 51 52def info(msg, *args): 53 if _logger: 54 _logger.log(INFO, msg, *args) 55 56def sub_warning(msg, *args): 57 if _logger: 58 _logger.log(SUBWARNING, msg, *args) 59 60def get_logger(): 61 ''' 62 Returns logger used by multiprocessing 63 ''' 64 global _logger 65 import logging 66 67 logging._acquireLock() 68 try: 69 if not _logger: 70 71 _logger = logging.getLogger(LOGGER_NAME) 72 _logger.propagate = 0 73 74 # XXX multiprocessing should cleanup before logging 75 if hasattr(atexit, 'unregister'): 76 atexit.unregister(_exit_function) 77 atexit.register(_exit_function) 78 else: 79 atexit._exithandlers.remove((_exit_function, (), {})) 80 atexit._exithandlers.append((_exit_function, (), {})) 81 82 finally: 83 logging._releaseLock() 84 85 return _logger 86 87def log_to_stderr(level=None): 88 ''' 89 Turn on logging and add a handler which prints to stderr 90 ''' 91 global _log_to_stderr 92 import logging 93 94 logger = get_logger() 95 formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) 96 handler = logging.StreamHandler() 97 handler.setFormatter(formatter) 98 logger.addHandler(handler) 99 100 if level: 101 logger.setLevel(level) 102 _log_to_stderr = True 103 return _logger 104 105# 106# Function returning a temp directory which will be removed on exit 107# 108 109def get_temp_dir(): 110 # get name of a temp directory which will be automatically cleaned up 111 tempdir = process.current_process()._config.get('tempdir') 112 if tempdir is None: 113 import shutil, tempfile 114 tempdir = tempfile.mkdtemp(prefix='pymp-') 115 info('created temp directory %s', tempdir) 116 Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100) 117 process.current_process()._config['tempdir'] = tempdir 118 return tempdir 119 120# 121# Support for reinitialization of objects when bootstrapping a child process 122# 123 124_afterfork_registry = weakref.WeakValueDictionary() 125_afterfork_counter = itertools.count() 126 127def _run_after_forkers(): 128 items = list(_afterfork_registry.items()) 129 items.sort() 130 for (index, ident, func), obj in items: 131 try: 132 func(obj) 133 except Exception as e: 134 info('after forker raised exception %s', e) 135 136def register_after_fork(obj, func): 137 _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj 138 139# 140# Finalization using weakrefs 141# 142 143_finalizer_registry = {} 144_finalizer_counter = itertools.count() 145 146 147class Finalize(object): 148 ''' 149 Class which supports object finalization using weakrefs 150 ''' 151 def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): 152 assert exitpriority is None or type(exitpriority) is int 153 154 if obj is not None: 155 self._weakref = weakref.ref(obj, self) 156 else: 157 assert exitpriority is not None 158 159 self._callback = callback 160 self._args = args 161 self._kwargs = kwargs or {} 162 self._key = (exitpriority, next(_finalizer_counter)) 163 self._pid = os.getpid() 164 165 _finalizer_registry[self._key] = self 166 167 def __call__(self, wr=None, 168 # Need to bind these locally because the globals can have 169 # been cleared at shutdown 170 _finalizer_registry=_finalizer_registry, 171 sub_debug=sub_debug, getpid=os.getpid): 172 ''' 173 Run the callback unless it has already been called or cancelled 174 ''' 175 try: 176 del _finalizer_registry[self._key] 177 except KeyError: 178 sub_debug('finalizer no longer registered') 179 else: 180 if self._pid != getpid(): 181 sub_debug('finalizer ignored because different process') 182 res = None 183 else: 184 sub_debug('finalizer calling %s with args %s and kwargs %s', 185 self._callback, self._args, self._kwargs) 186 res = self._callback(*self._args, **self._kwargs) 187 self._weakref = self._callback = self._args = \ 188 self._kwargs = self._key = None 189 return res 190 191 def cancel(self): 192 ''' 193 Cancel finalization of the object 194 ''' 195 try: 196 del _finalizer_registry[self._key] 197 except KeyError: 198 pass 199 else: 200 self._weakref = self._callback = self._args = \ 201 self._kwargs = self._key = None 202 203 def still_active(self): 204 ''' 205 Return whether this finalizer is still waiting to invoke callback 206 ''' 207 return self._key in _finalizer_registry 208 209 def __repr__(self): 210 try: 211 obj = self._weakref() 212 except (AttributeError, TypeError): 213 obj = None 214 215 if obj is None: 216 return '<%s object, dead>' % self.__class__.__name__ 217 218 x = '<%s object, callback=%s' % ( 219 self.__class__.__name__, 220 getattr(self._callback, '__name__', self._callback)) 221 if self._args: 222 x += ', args=' + str(self._args) 223 if self._kwargs: 224 x += ', kwargs=' + str(self._kwargs) 225 if self._key[0] is not None: 226 x += ', exitprority=' + str(self._key[0]) 227 return x + '>' 228 229 230def _run_finalizers(minpriority=None): 231 ''' 232 Run all finalizers whose exit priority is not None and at least minpriority 233 234 Finalizers with highest priority are called first; finalizers with 235 the same priority will be called in reverse order of creation. 236 ''' 237 if _finalizer_registry is None: 238 # This function may be called after this module's globals are 239 # destroyed. See the _exit_function function in this module for more 240 # notes. 241 return 242 243 if minpriority is None: 244 f = lambda p : p[0][0] is not None 245 else: 246 f = lambda p : p[0][0] is not None and p[0][0] >= minpriority 247 248 items = [x for x in list(_finalizer_registry.items()) if f(x)] 249 items.sort(reverse=True) 250 251 for key, finalizer in items: 252 sub_debug('calling %s', finalizer) 253 try: 254 finalizer() 255 except Exception: 256 import traceback 257 traceback.print_exc() 258 259 if minpriority is None: 260 _finalizer_registry.clear() 261 262# 263# Clean up on exit 264# 265 266def is_exiting(): 267 ''' 268 Returns true if the process is shutting down 269 ''' 270 return _exiting or _exiting is None 271 272_exiting = False 273 274def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, 275 active_children=process.active_children, 276 current_process=process.current_process): 277 # We hold on to references to functions in the arglist due to the 278 # situation described below, where this function is called after this 279 # module's globals are destroyed. 280 281 global _exiting 282 283 if not _exiting: 284 _exiting = True 285 286 info('process shutting down') 287 debug('running all "atexit" finalizers with priority >= 0') 288 _run_finalizers(0) 289 290 if current_process() is not None: 291 # We check if the current process is None here because if 292 # it's None, any call to ``active_children()`` will raise 293 # an AttributeError (active_children winds up trying to 294 # get attributes from util._current_process). One 295 # situation where this can happen is if someone has 296 # manipulated sys.modules, causing this module to be 297 # garbage collected. The destructor for the module type 298 # then replaces all values in the module dict with None. 299 # For instance, after setuptools runs a test it replaces 300 # sys.modules with a copy created earlier. See issues 301 # #9775 and #15881. Also related: #4106, #9205, and 302 # #9207. 303 304 for p in active_children(): 305 if p.daemon: 306 info('calling terminate() for daemon %s', p.name) 307 p._popen.terminate() 308 309 for p in active_children(): 310 info('calling join() for process %s', p.name) 311 p.join() 312 313 debug('running the remaining "atexit" finalizers') 314 _run_finalizers() 315 316atexit.register(_exit_function) 317 318# 319# Some fork aware types 320# 321 322class ForkAwareThreadLock(object): 323 def __init__(self): 324 self._reset() 325 register_after_fork(self, ForkAwareThreadLock._reset) 326 327 def _reset(self): 328 self._lock = threading.Lock() 329 self.acquire = self._lock.acquire 330 self.release = self._lock.release 331 332 def __enter__(self): 333 return self._lock.__enter__() 334 335 def __exit__(self, *args): 336 return self._lock.__exit__(*args) 337 338 339class ForkAwareLocal(threading.local): 340 def __init__(self): 341 register_after_fork(self, lambda obj : obj.__dict__.clear()) 342 def __reduce__(self): 343 return type(self), () 344 345# 346# Close fds except those specified 347# 348 349try: 350 MAXFD = os.sysconf("SC_OPEN_MAX") 351except Exception: 352 MAXFD = 256 353 354def close_all_fds_except(fds): 355 fds = list(fds) + [-1, MAXFD] 356 fds.sort() 357 assert fds[-1] == MAXFD, 'fd too large' 358 for i in range(len(fds) - 1): 359 os.closerange(fds[i]+1, fds[i+1]) 360# 361# Close sys.stdin and replace stdin with os.devnull 362# 363 364def _close_stdin(): 365 if sys.stdin is None: 366 return 367 368 try: 369 sys.stdin.close() 370 except (OSError, ValueError): 371 pass 372 373 try: 374 fd = os.open(os.devnull, os.O_RDONLY) 375 try: 376 sys.stdin = open(fd, closefd=False) 377 except: 378 os.close(fd) 379 raise 380 except (OSError, ValueError): 381 pass 382 383# 384# Start a program with only specified fds kept open 385# 386 387def spawnv_passfds(path, args, passfds): 388 import _posixsubprocess 389 passfds = sorted(passfds) 390 errpipe_read, errpipe_write = os.pipe() 391 try: 392 return _posixsubprocess.fork_exec( 393 args, [os.fsencode(path)], True, passfds, None, None, 394 -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write, 395 False, False, None) 396 finally: 397 os.close(errpipe_read) 398 os.close(errpipe_write) 399