1# 2# Module providing various facilities to other parts of the package 3# 4# multiprocessing/util.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10import os 11import itertools 12import sys 13import weakref 14import atexit 15import threading # we want threading to install it's 16 # cleanup function before multiprocessing does 17from subprocess import _args_from_interpreter_flags 18 19from . import process 20 21__all__ = [ 22 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', 23 'log_to_stderr', 'get_temp_dir', 'register_after_fork', 24 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal', 25 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING', 26 ] 27 28# 29# Logging 30# 31 32NOTSET = 0 33SUBDEBUG = 5 34DEBUG = 10 35INFO = 20 36SUBWARNING = 25 37 38LOGGER_NAME = 'multiprocessing' 39DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' 40 41_logger = None 42_log_to_stderr = False 43 44def sub_debug(msg, *args): 45 if _logger: 46 _logger.log(SUBDEBUG, msg, *args) 47 48def debug(msg, *args): 49 if _logger: 50 _logger.log(DEBUG, msg, *args) 51 52def info(msg, *args): 53 if _logger: 54 _logger.log(INFO, msg, *args) 55 56def sub_warning(msg, *args): 57 if _logger: 58 _logger.log(SUBWARNING, msg, *args) 59 60def get_logger(): 61 ''' 62 Returns logger used by multiprocessing 63 ''' 64 global _logger 65 import logging 66 67 logging._acquireLock() 68 try: 69 if not _logger: 70 71 _logger = logging.getLogger(LOGGER_NAME) 72 _logger.propagate = 0 73 74 # XXX multiprocessing should cleanup before logging 75 if hasattr(atexit, 'unregister'): 76 atexit.unregister(_exit_function) 77 atexit.register(_exit_function) 78 else: 79 atexit._exithandlers.remove((_exit_function, (), {})) 80 atexit._exithandlers.append((_exit_function, (), {})) 81 82 finally: 83 logging._releaseLock() 84 85 return _logger 86 87def log_to_stderr(level=None): 88 ''' 89 Turn on logging and add a handler which prints to stderr 90 ''' 91 global _log_to_stderr 92 import logging 93 94 logger = get_logger() 95 formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) 96 handler = logging.StreamHandler() 97 handler.setFormatter(formatter) 98 logger.addHandler(handler) 99 100 if level: 101 logger.setLevel(level) 102 _log_to_stderr = True 103 return _logger 104 105# 106# Function returning a temp directory which will be removed on exit 107# 108 109def get_temp_dir(): 110 # get name of a temp directory which will be automatically cleaned up 111 tempdir = process.current_process()._config.get('tempdir') 112 if tempdir is None: 113 import shutil, tempfile 114 tempdir = tempfile.mkdtemp(prefix='pymp-') 115 info('created temp directory %s', tempdir) 116 Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100) 117 process.current_process()._config['tempdir'] = tempdir 118 return tempdir 119 120# 121# Support for reinitialization of objects when bootstrapping a child process 122# 123 124_afterfork_registry = weakref.WeakValueDictionary() 125_afterfork_counter = itertools.count() 126 127def _run_after_forkers(): 128 items = list(_afterfork_registry.items()) 129 items.sort() 130 for (index, ident, func), obj in items: 131 try: 132 func(obj) 133 except Exception as e: 134 info('after forker raised exception %s', e) 135 136def register_after_fork(obj, func): 137 _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj 138 139# 140# Finalization using weakrefs 141# 142 143_finalizer_registry = {} 144_finalizer_counter = itertools.count() 145 146 147class Finalize(object): 148 ''' 149 Class which supports object finalization using weakrefs 150 ''' 151 def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): 152 if (exitpriority is not None) and not isinstance(exitpriority,int): 153 raise TypeError( 154 "Exitpriority ({0!r}) must be None or int, not {1!s}".format( 155 exitpriority, type(exitpriority))) 156 157 if obj is not None: 158 self._weakref = weakref.ref(obj, self) 159 elif exitpriority is None: 160 raise ValueError("Without object, exitpriority cannot be None") 161 162 self._callback = callback 163 self._args = args 164 self._kwargs = kwargs or {} 165 self._key = (exitpriority, next(_finalizer_counter)) 166 self._pid = os.getpid() 167 168 _finalizer_registry[self._key] = self 169 170 def __call__(self, wr=None, 171 # Need to bind these locally because the globals can have 172 # been cleared at shutdown 173 _finalizer_registry=_finalizer_registry, 174 sub_debug=sub_debug, getpid=os.getpid): 175 ''' 176 Run the callback unless it has already been called or cancelled 177 ''' 178 try: 179 del _finalizer_registry[self._key] 180 except KeyError: 181 sub_debug('finalizer no longer registered') 182 else: 183 if self._pid != getpid(): 184 sub_debug('finalizer ignored because different process') 185 res = None 186 else: 187 sub_debug('finalizer calling %s with args %s and kwargs %s', 188 self._callback, self._args, self._kwargs) 189 res = self._callback(*self._args, **self._kwargs) 190 self._weakref = self._callback = self._args = \ 191 self._kwargs = self._key = None 192 return res 193 194 def cancel(self): 195 ''' 196 Cancel finalization of the object 197 ''' 198 try: 199 del _finalizer_registry[self._key] 200 except KeyError: 201 pass 202 else: 203 self._weakref = self._callback = self._args = \ 204 self._kwargs = self._key = None 205 206 def still_active(self): 207 ''' 208 Return whether this finalizer is still waiting to invoke callback 209 ''' 210 return self._key in _finalizer_registry 211 212 def __repr__(self): 213 try: 214 obj = self._weakref() 215 except (AttributeError, TypeError): 216 obj = None 217 218 if obj is None: 219 return '<%s object, dead>' % self.__class__.__name__ 220 221 x = '<%s object, callback=%s' % ( 222 self.__class__.__name__, 223 getattr(self._callback, '__name__', self._callback)) 224 if self._args: 225 x += ', args=' + str(self._args) 226 if self._kwargs: 227 x += ', kwargs=' + str(self._kwargs) 228 if self._key[0] is not None: 229 x += ', exitprority=' + str(self._key[0]) 230 return x + '>' 231 232 233def _run_finalizers(minpriority=None): 234 ''' 235 Run all finalizers whose exit priority is not None and at least minpriority 236 237 Finalizers with highest priority are called first; finalizers with 238 the same priority will be called in reverse order of creation. 239 ''' 240 if _finalizer_registry is None: 241 # This function may be called after this module's globals are 242 # destroyed. See the _exit_function function in this module for more 243 # notes. 244 return 245 246 if minpriority is None: 247 f = lambda p : p[0] is not None 248 else: 249 f = lambda p : p[0] is not None and p[0] >= minpriority 250 251 # Careful: _finalizer_registry may be mutated while this function 252 # is running (either by a GC run or by another thread). 253 254 # list(_finalizer_registry) should be atomic, while 255 # list(_finalizer_registry.items()) is not. 256 keys = [key for key in list(_finalizer_registry) if f(key)] 257 keys.sort(reverse=True) 258 259 for key in keys: 260 finalizer = _finalizer_registry.get(key) 261 # key may have been removed from the registry 262 if finalizer is not None: 263 sub_debug('calling %s', finalizer) 264 try: 265 finalizer() 266 except Exception: 267 import traceback 268 traceback.print_exc() 269 270 if minpriority is None: 271 _finalizer_registry.clear() 272 273# 274# Clean up on exit 275# 276 277def is_exiting(): 278 ''' 279 Returns true if the process is shutting down 280 ''' 281 return _exiting or _exiting is None 282 283_exiting = False 284 285def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, 286 active_children=process.active_children, 287 current_process=process.current_process): 288 # We hold on to references to functions in the arglist due to the 289 # situation described below, where this function is called after this 290 # module's globals are destroyed. 291 292 global _exiting 293 294 if not _exiting: 295 _exiting = True 296 297 info('process shutting down') 298 debug('running all "atexit" finalizers with priority >= 0') 299 _run_finalizers(0) 300 301 if current_process() is not None: 302 # We check if the current process is None here because if 303 # it's None, any call to ``active_children()`` will raise 304 # an AttributeError (active_children winds up trying to 305 # get attributes from util._current_process). One 306 # situation where this can happen is if someone has 307 # manipulated sys.modules, causing this module to be 308 # garbage collected. The destructor for the module type 309 # then replaces all values in the module dict with None. 310 # For instance, after setuptools runs a test it replaces 311 # sys.modules with a copy created earlier. See issues 312 # #9775 and #15881. Also related: #4106, #9205, and 313 # #9207. 314 315 for p in active_children(): 316 if p.daemon: 317 info('calling terminate() for daemon %s', p.name) 318 p._popen.terminate() 319 320 for p in active_children(): 321 info('calling join() for process %s', p.name) 322 p.join() 323 324 debug('running the remaining "atexit" finalizers') 325 _run_finalizers() 326 327atexit.register(_exit_function) 328 329# 330# Some fork aware types 331# 332 333class ForkAwareThreadLock(object): 334 def __init__(self): 335 self._reset() 336 register_after_fork(self, ForkAwareThreadLock._reset) 337 338 def _reset(self): 339 self._lock = threading.Lock() 340 self.acquire = self._lock.acquire 341 self.release = self._lock.release 342 343 def __enter__(self): 344 return self._lock.__enter__() 345 346 def __exit__(self, *args): 347 return self._lock.__exit__(*args) 348 349 350class ForkAwareLocal(threading.local): 351 def __init__(self): 352 register_after_fork(self, lambda obj : obj.__dict__.clear()) 353 def __reduce__(self): 354 return type(self), () 355 356# 357# Close fds except those specified 358# 359 360try: 361 MAXFD = os.sysconf("SC_OPEN_MAX") 362except Exception: 363 MAXFD = 256 364 365def close_all_fds_except(fds): 366 fds = list(fds) + [-1, MAXFD] 367 fds.sort() 368 assert fds[-1] == MAXFD, 'fd too large' 369 for i in range(len(fds) - 1): 370 os.closerange(fds[i]+1, fds[i+1]) 371# 372# Close sys.stdin and replace stdin with os.devnull 373# 374 375def _close_stdin(): 376 if sys.stdin is None: 377 return 378 379 try: 380 sys.stdin.close() 381 except (OSError, ValueError): 382 pass 383 384 try: 385 fd = os.open(os.devnull, os.O_RDONLY) 386 try: 387 sys.stdin = open(fd, closefd=False) 388 except: 389 os.close(fd) 390 raise 391 except (OSError, ValueError): 392 pass 393 394# 395# Flush standard streams, if any 396# 397 398def _flush_std_streams(): 399 try: 400 sys.stdout.flush() 401 except (AttributeError, ValueError): 402 pass 403 try: 404 sys.stderr.flush() 405 except (AttributeError, ValueError): 406 pass 407 408# 409# Start a program with only specified fds kept open 410# 411 412def spawnv_passfds(path, args, passfds): 413 import _posixsubprocess 414 passfds = tuple(sorted(map(int, passfds))) 415 errpipe_read, errpipe_write = os.pipe() 416 try: 417 return _posixsubprocess.fork_exec( 418 args, [os.fsencode(path)], True, passfds, None, None, 419 -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write, 420 False, False, None) 421 finally: 422 os.close(errpipe_read) 423 os.close(errpipe_write) 424