1# 2# Module providing various facilities to other parts of the package 3# 4# multiprocessing/util.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10import os 11import itertools 12import sys 13import weakref 14import atexit 15import threading # we want threading to install it's 16 # cleanup function before multiprocessing does 17from subprocess import _args_from_interpreter_flags 18 19from . import process 20 21__all__ = [ 22 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', 23 'log_to_stderr', 'get_temp_dir', 'register_after_fork', 24 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal', 25 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING', 26 ] 27 28# 29# Logging 30# 31 32NOTSET = 0 33SUBDEBUG = 5 34DEBUG = 10 35INFO = 20 36SUBWARNING = 25 37 38LOGGER_NAME = 'multiprocessing' 39DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' 40 41_logger = None 42_log_to_stderr = False 43 44def sub_debug(msg, *args): 45 if _logger: 46 _logger.log(SUBDEBUG, msg, *args) 47 48def debug(msg, *args): 49 if _logger: 50 _logger.log(DEBUG, msg, *args) 51 52def info(msg, *args): 53 if _logger: 54 _logger.log(INFO, msg, *args) 55 56def sub_warning(msg, *args): 57 if _logger: 58 _logger.log(SUBWARNING, msg, *args) 59 60def get_logger(): 61 ''' 62 Returns logger used by multiprocessing 63 ''' 64 global _logger 65 import logging 66 67 logging._acquireLock() 68 try: 69 if not _logger: 70 71 _logger = logging.getLogger(LOGGER_NAME) 72 _logger.propagate = 0 73 74 # XXX multiprocessing should cleanup before logging 75 if hasattr(atexit, 'unregister'): 76 atexit.unregister(_exit_function) 77 atexit.register(_exit_function) 78 else: 79 atexit._exithandlers.remove((_exit_function, (), {})) 80 atexit._exithandlers.append((_exit_function, (), {})) 81 82 finally: 83 logging._releaseLock() 84 85 return _logger 86 87def log_to_stderr(level=None): 88 ''' 89 Turn on logging and add a handler which prints to stderr 90 ''' 91 global _log_to_stderr 92 import logging 93 94 logger = get_logger() 95 formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) 96 handler = logging.StreamHandler() 97 handler.setFormatter(formatter) 98 logger.addHandler(handler) 99 100 if level: 101 logger.setLevel(level) 102 _log_to_stderr = True 103 return _logger 104 105# 106# Function returning a temp directory which will be removed on exit 107# 108 109def _remove_temp_dir(rmtree, tempdir): 110 rmtree(tempdir) 111 112 current_process = process.current_process() 113 # current_process() can be None if the finalizer is called 114 # late during Python finalization 115 if current_process is not None: 116 current_process._config['tempdir'] = None 117 118def get_temp_dir(): 119 # get name of a temp directory which will be automatically cleaned up 120 tempdir = process.current_process()._config.get('tempdir') 121 if tempdir is None: 122 import shutil, tempfile 123 tempdir = tempfile.mkdtemp(prefix='pymp-') 124 info('created temp directory %s', tempdir) 125 # keep a strong reference to shutil.rmtree(), since the finalizer 126 # can be called late during Python shutdown 127 Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir), 128 exitpriority=-100) 129 process.current_process()._config['tempdir'] = tempdir 130 return tempdir 131 132# 133# Support for reinitialization of objects when bootstrapping a child process 134# 135 136_afterfork_registry = weakref.WeakValueDictionary() 137_afterfork_counter = itertools.count() 138 139def _run_after_forkers(): 140 items = list(_afterfork_registry.items()) 141 items.sort() 142 for (index, ident, func), obj in items: 143 try: 144 func(obj) 145 except Exception as e: 146 info('after forker raised exception %s', e) 147 148def register_after_fork(obj, func): 149 _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj 150 151# 152# Finalization using weakrefs 153# 154 155_finalizer_registry = {} 156_finalizer_counter = itertools.count() 157 158 159class Finalize(object): 160 ''' 161 Class which supports object finalization using weakrefs 162 ''' 163 def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): 164 if (exitpriority is not None) and not isinstance(exitpriority,int): 165 raise TypeError( 166 "Exitpriority ({0!r}) must be None or int, not {1!s}".format( 167 exitpriority, type(exitpriority))) 168 169 if obj is not None: 170 self._weakref = weakref.ref(obj, self) 171 elif exitpriority is None: 172 raise ValueError("Without object, exitpriority cannot be None") 173 174 self._callback = callback 175 self._args = args 176 self._kwargs = kwargs or {} 177 self._key = (exitpriority, next(_finalizer_counter)) 178 self._pid = os.getpid() 179 180 _finalizer_registry[self._key] = self 181 182 def __call__(self, wr=None, 183 # Need to bind these locally because the globals can have 184 # been cleared at shutdown 185 _finalizer_registry=_finalizer_registry, 186 sub_debug=sub_debug, getpid=os.getpid): 187 ''' 188 Run the callback unless it has already been called or cancelled 189 ''' 190 try: 191 del _finalizer_registry[self._key] 192 except KeyError: 193 sub_debug('finalizer no longer registered') 194 else: 195 if self._pid != getpid(): 196 sub_debug('finalizer ignored because different process') 197 res = None 198 else: 199 sub_debug('finalizer calling %s with args %s and kwargs %s', 200 self._callback, self._args, self._kwargs) 201 res = self._callback(*self._args, **self._kwargs) 202 self._weakref = self._callback = self._args = \ 203 self._kwargs = self._key = None 204 return res 205 206 def cancel(self): 207 ''' 208 Cancel finalization of the object 209 ''' 210 try: 211 del _finalizer_registry[self._key] 212 except KeyError: 213 pass 214 else: 215 self._weakref = self._callback = self._args = \ 216 self._kwargs = self._key = None 217 218 def still_active(self): 219 ''' 220 Return whether this finalizer is still waiting to invoke callback 221 ''' 222 return self._key in _finalizer_registry 223 224 def __repr__(self): 225 try: 226 obj = self._weakref() 227 except (AttributeError, TypeError): 228 obj = None 229 230 if obj is None: 231 return '<%s object, dead>' % self.__class__.__name__ 232 233 x = '<%s object, callback=%s' % ( 234 self.__class__.__name__, 235 getattr(self._callback, '__name__', self._callback)) 236 if self._args: 237 x += ', args=' + str(self._args) 238 if self._kwargs: 239 x += ', kwargs=' + str(self._kwargs) 240 if self._key[0] is not None: 241 x += ', exitpriority=' + str(self._key[0]) 242 return x + '>' 243 244 245def _run_finalizers(minpriority=None): 246 ''' 247 Run all finalizers whose exit priority is not None and at least minpriority 248 249 Finalizers with highest priority are called first; finalizers with 250 the same priority will be called in reverse order of creation. 251 ''' 252 if _finalizer_registry is None: 253 # This function may be called after this module's globals are 254 # destroyed. See the _exit_function function in this module for more 255 # notes. 256 return 257 258 if minpriority is None: 259 f = lambda p : p[0] is not None 260 else: 261 f = lambda p : p[0] is not None and p[0] >= minpriority 262 263 # Careful: _finalizer_registry may be mutated while this function 264 # is running (either by a GC run or by another thread). 265 266 # list(_finalizer_registry) should be atomic, while 267 # list(_finalizer_registry.items()) is not. 268 keys = [key for key in list(_finalizer_registry) if f(key)] 269 keys.sort(reverse=True) 270 271 for key in keys: 272 finalizer = _finalizer_registry.get(key) 273 # key may have been removed from the registry 274 if finalizer is not None: 275 sub_debug('calling %s', finalizer) 276 try: 277 finalizer() 278 except Exception: 279 import traceback 280 traceback.print_exc() 281 282 if minpriority is None: 283 _finalizer_registry.clear() 284 285# 286# Clean up on exit 287# 288 289def is_exiting(): 290 ''' 291 Returns true if the process is shutting down 292 ''' 293 return _exiting or _exiting is None 294 295_exiting = False 296 297def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, 298 active_children=process.active_children, 299 current_process=process.current_process): 300 # We hold on to references to functions in the arglist due to the 301 # situation described below, where this function is called after this 302 # module's globals are destroyed. 303 304 global _exiting 305 306 if not _exiting: 307 _exiting = True 308 309 info('process shutting down') 310 debug('running all "atexit" finalizers with priority >= 0') 311 _run_finalizers(0) 312 313 if current_process() is not None: 314 # We check if the current process is None here because if 315 # it's None, any call to ``active_children()`` will raise 316 # an AttributeError (active_children winds up trying to 317 # get attributes from util._current_process). One 318 # situation where this can happen is if someone has 319 # manipulated sys.modules, causing this module to be 320 # garbage collected. The destructor for the module type 321 # then replaces all values in the module dict with None. 322 # For instance, after setuptools runs a test it replaces 323 # sys.modules with a copy created earlier. See issues 324 # #9775 and #15881. Also related: #4106, #9205, and 325 # #9207. 326 327 for p in active_children(): 328 if p.daemon: 329 info('calling terminate() for daemon %s', p.name) 330 p._popen.terminate() 331 332 for p in active_children(): 333 info('calling join() for process %s', p.name) 334 p.join() 335 336 debug('running the remaining "atexit" finalizers') 337 _run_finalizers() 338 339atexit.register(_exit_function) 340 341# 342# Some fork aware types 343# 344 345class ForkAwareThreadLock(object): 346 def __init__(self): 347 self._reset() 348 register_after_fork(self, ForkAwareThreadLock._reset) 349 350 def _reset(self): 351 self._lock = threading.Lock() 352 self.acquire = self._lock.acquire 353 self.release = self._lock.release 354 355 def __enter__(self): 356 return self._lock.__enter__() 357 358 def __exit__(self, *args): 359 return self._lock.__exit__(*args) 360 361 362class ForkAwareLocal(threading.local): 363 def __init__(self): 364 register_after_fork(self, lambda obj : obj.__dict__.clear()) 365 def __reduce__(self): 366 return type(self), () 367 368# 369# Close fds except those specified 370# 371 372try: 373 MAXFD = os.sysconf("SC_OPEN_MAX") 374except Exception: 375 MAXFD = 256 376 377def close_all_fds_except(fds): 378 fds = list(fds) + [-1, MAXFD] 379 fds.sort() 380 assert fds[-1] == MAXFD, 'fd too large' 381 for i in range(len(fds) - 1): 382 os.closerange(fds[i]+1, fds[i+1]) 383# 384# Close sys.stdin and replace stdin with os.devnull 385# 386 387def _close_stdin(): 388 if sys.stdin is None: 389 return 390 391 try: 392 sys.stdin.close() 393 except (OSError, ValueError): 394 pass 395 396 try: 397 fd = os.open(os.devnull, os.O_RDONLY) 398 try: 399 sys.stdin = open(fd, closefd=False) 400 except: 401 os.close(fd) 402 raise 403 except (OSError, ValueError): 404 pass 405 406# 407# Flush standard streams, if any 408# 409 410def _flush_std_streams(): 411 try: 412 sys.stdout.flush() 413 except (AttributeError, ValueError): 414 pass 415 try: 416 sys.stderr.flush() 417 except (AttributeError, ValueError): 418 pass 419 420# 421# Start a program with only specified fds kept open 422# 423 424def spawnv_passfds(path, args, passfds): 425 import _posixsubprocess 426 passfds = tuple(sorted(map(int, passfds))) 427 errpipe_read, errpipe_write = os.pipe() 428 try: 429 return _posixsubprocess.fork_exec( 430 args, [os.fsencode(path)], True, passfds, None, None, 431 -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write, 432 False, False, None) 433 finally: 434 os.close(errpipe_read) 435 os.close(errpipe_write) 436 437 438def close_fds(*fds): 439 """Close each file descriptor given as an argument""" 440 for fd in fds: 441 os.close(fd) 442 443 444def _cleanup_tests(): 445 """Cleanup multiprocessing resources when multiprocessing tests 446 completed.""" 447 448 from test import support 449 450 # cleanup multiprocessing 451 process._cleanup() 452 453 # Stop the ForkServer process if it's running 454 from multiprocessing import forkserver 455 forkserver._forkserver._stop() 456 457 # Stop the ResourceTracker process if it's running 458 from multiprocessing import resource_tracker 459 resource_tracker._resource_tracker._stop() 460 461 # bpo-37421: Explicitly call _run_finalizers() to remove immediately 462 # temporary directories created by multiprocessing.util.get_temp_dir(). 463 _run_finalizers() 464 support.gc_collect() 465 466 support.reap_children() 467