1# 2# Module providing various facilities to other parts of the package 3# 4# multiprocessing/util.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10import os 11import itertools 12import sys 13import weakref 14import atexit 15import threading # we want threading to install it's 16 # cleanup function before multiprocessing does 17from subprocess import _args_from_interpreter_flags 18 19from . import process 20 21__all__ = [ 22 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', 23 'log_to_stderr', 'get_temp_dir', 'register_after_fork', 24 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal', 25 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING', 26 ] 27 28# 29# Logging 30# 31 32NOTSET = 0 33SUBDEBUG = 5 34DEBUG = 10 35INFO = 20 36SUBWARNING = 25 37 38LOGGER_NAME = 'multiprocessing' 39DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' 40 41_logger = None 42_log_to_stderr = False 43 44def sub_debug(msg, *args): 45 if _logger: 46 _logger.log(SUBDEBUG, msg, *args) 47 48def debug(msg, *args): 49 if _logger: 50 _logger.log(DEBUG, msg, *args) 51 52def info(msg, *args): 53 if _logger: 54 _logger.log(INFO, msg, *args) 55 56def sub_warning(msg, *args): 57 if _logger: 58 _logger.log(SUBWARNING, msg, *args) 59 60def get_logger(): 61 ''' 62 Returns logger used by multiprocessing 63 ''' 64 global _logger 65 import logging 66 67 logging._acquireLock() 68 try: 69 if not _logger: 70 71 _logger = logging.getLogger(LOGGER_NAME) 72 _logger.propagate = 0 73 74 # XXX multiprocessing should cleanup before logging 75 if hasattr(atexit, 'unregister'): 76 atexit.unregister(_exit_function) 77 atexit.register(_exit_function) 78 else: 79 atexit._exithandlers.remove((_exit_function, (), {})) 80 atexit._exithandlers.append((_exit_function, (), {})) 81 82 finally: 83 logging._releaseLock() 84 85 return _logger 86 87def log_to_stderr(level=None): 88 ''' 89 Turn on logging and add a handler which prints to stderr 90 ''' 91 global _log_to_stderr 92 import logging 93 94 logger = get_logger() 95 formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) 96 handler = logging.StreamHandler() 97 handler.setFormatter(formatter) 98 logger.addHandler(handler) 99 100 if level: 101 logger.setLevel(level) 102 _log_to_stderr = True 103 return _logger 104 105 106# Abstract socket support 107 108def _platform_supports_abstract_sockets(): 109 if sys.platform == "linux": 110 return True 111 if hasattr(sys, 'getandroidapilevel'): 112 return True 113 return False 114 115 116def is_abstract_socket_namespace(address): 117 if not address: 118 return False 119 if isinstance(address, bytes): 120 return address[0] == 0 121 elif isinstance(address, str): 122 return address[0] == "\0" 123 raise TypeError('address type of {address!r} unrecognized') 124 125 126abstract_sockets_supported = _platform_supports_abstract_sockets() 127 128# 129# Function returning a temp directory which will be removed on exit 130# 131 132def _remove_temp_dir(rmtree, tempdir): 133 rmtree(tempdir) 134 135 current_process = process.current_process() 136 # current_process() can be None if the finalizer is called 137 # late during Python finalization 138 if current_process is not None: 139 current_process._config['tempdir'] = None 140 141def get_temp_dir(): 142 # get name of a temp directory which will be automatically cleaned up 143 tempdir = process.current_process()._config.get('tempdir') 144 if tempdir is None: 145 import shutil, tempfile 146 tempdir = tempfile.mkdtemp(prefix='pymp-') 147 info('created temp directory %s', tempdir) 148 # keep a strong reference to shutil.rmtree(), since the finalizer 149 # can be called late during Python shutdown 150 Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir), 151 exitpriority=-100) 152 process.current_process()._config['tempdir'] = tempdir 153 return tempdir 154 155# 156# Support for reinitialization of objects when bootstrapping a child process 157# 158 159_afterfork_registry = weakref.WeakValueDictionary() 160_afterfork_counter = itertools.count() 161 162def _run_after_forkers(): 163 items = list(_afterfork_registry.items()) 164 items.sort() 165 for (index, ident, func), obj in items: 166 try: 167 func(obj) 168 except Exception as e: 169 info('after forker raised exception %s', e) 170 171def register_after_fork(obj, func): 172 _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj 173 174# 175# Finalization using weakrefs 176# 177 178_finalizer_registry = {} 179_finalizer_counter = itertools.count() 180 181 182class Finalize(object): 183 ''' 184 Class which supports object finalization using weakrefs 185 ''' 186 def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): 187 if (exitpriority is not None) and not isinstance(exitpriority,int): 188 raise TypeError( 189 "Exitpriority ({0!r}) must be None or int, not {1!s}".format( 190 exitpriority, type(exitpriority))) 191 192 if obj is not None: 193 self._weakref = weakref.ref(obj, self) 194 elif exitpriority is None: 195 raise ValueError("Without object, exitpriority cannot be None") 196 197 self._callback = callback 198 self._args = args 199 self._kwargs = kwargs or {} 200 self._key = (exitpriority, next(_finalizer_counter)) 201 self._pid = os.getpid() 202 203 _finalizer_registry[self._key] = self 204 205 def __call__(self, wr=None, 206 # Need to bind these locally because the globals can have 207 # been cleared at shutdown 208 _finalizer_registry=_finalizer_registry, 209 sub_debug=sub_debug, getpid=os.getpid): 210 ''' 211 Run the callback unless it has already been called or cancelled 212 ''' 213 try: 214 del _finalizer_registry[self._key] 215 except KeyError: 216 sub_debug('finalizer no longer registered') 217 else: 218 if self._pid != getpid(): 219 sub_debug('finalizer ignored because different process') 220 res = None 221 else: 222 sub_debug('finalizer calling %s with args %s and kwargs %s', 223 self._callback, self._args, self._kwargs) 224 res = self._callback(*self._args, **self._kwargs) 225 self._weakref = self._callback = self._args = \ 226 self._kwargs = self._key = None 227 return res 228 229 def cancel(self): 230 ''' 231 Cancel finalization of the object 232 ''' 233 try: 234 del _finalizer_registry[self._key] 235 except KeyError: 236 pass 237 else: 238 self._weakref = self._callback = self._args = \ 239 self._kwargs = self._key = None 240 241 def still_active(self): 242 ''' 243 Return whether this finalizer is still waiting to invoke callback 244 ''' 245 return self._key in _finalizer_registry 246 247 def __repr__(self): 248 try: 249 obj = self._weakref() 250 except (AttributeError, TypeError): 251 obj = None 252 253 if obj is None: 254 return '<%s object, dead>' % self.__class__.__name__ 255 256 x = '<%s object, callback=%s' % ( 257 self.__class__.__name__, 258 getattr(self._callback, '__name__', self._callback)) 259 if self._args: 260 x += ', args=' + str(self._args) 261 if self._kwargs: 262 x += ', kwargs=' + str(self._kwargs) 263 if self._key[0] is not None: 264 x += ', exitpriority=' + str(self._key[0]) 265 return x + '>' 266 267 268def _run_finalizers(minpriority=None): 269 ''' 270 Run all finalizers whose exit priority is not None and at least minpriority 271 272 Finalizers with highest priority are called first; finalizers with 273 the same priority will be called in reverse order of creation. 274 ''' 275 if _finalizer_registry is None: 276 # This function may be called after this module's globals are 277 # destroyed. See the _exit_function function in this module for more 278 # notes. 279 return 280 281 if minpriority is None: 282 f = lambda p : p[0] is not None 283 else: 284 f = lambda p : p[0] is not None and p[0] >= minpriority 285 286 # Careful: _finalizer_registry may be mutated while this function 287 # is running (either by a GC run or by another thread). 288 289 # list(_finalizer_registry) should be atomic, while 290 # list(_finalizer_registry.items()) is not. 291 keys = [key for key in list(_finalizer_registry) if f(key)] 292 keys.sort(reverse=True) 293 294 for key in keys: 295 finalizer = _finalizer_registry.get(key) 296 # key may have been removed from the registry 297 if finalizer is not None: 298 sub_debug('calling %s', finalizer) 299 try: 300 finalizer() 301 except Exception: 302 import traceback 303 traceback.print_exc() 304 305 if minpriority is None: 306 _finalizer_registry.clear() 307 308# 309# Clean up on exit 310# 311 312def is_exiting(): 313 ''' 314 Returns true if the process is shutting down 315 ''' 316 return _exiting or _exiting is None 317 318_exiting = False 319 320def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, 321 active_children=process.active_children, 322 current_process=process.current_process): 323 # We hold on to references to functions in the arglist due to the 324 # situation described below, where this function is called after this 325 # module's globals are destroyed. 326 327 global _exiting 328 329 if not _exiting: 330 _exiting = True 331 332 info('process shutting down') 333 debug('running all "atexit" finalizers with priority >= 0') 334 _run_finalizers(0) 335 336 if current_process() is not None: 337 # We check if the current process is None here because if 338 # it's None, any call to ``active_children()`` will raise 339 # an AttributeError (active_children winds up trying to 340 # get attributes from util._current_process). One 341 # situation where this can happen is if someone has 342 # manipulated sys.modules, causing this module to be 343 # garbage collected. The destructor for the module type 344 # then replaces all values in the module dict with None. 345 # For instance, after setuptools runs a test it replaces 346 # sys.modules with a copy created earlier. See issues 347 # #9775 and #15881. Also related: #4106, #9205, and 348 # #9207. 349 350 for p in active_children(): 351 if p.daemon: 352 info('calling terminate() for daemon %s', p.name) 353 p._popen.terminate() 354 355 for p in active_children(): 356 info('calling join() for process %s', p.name) 357 p.join() 358 359 debug('running the remaining "atexit" finalizers') 360 _run_finalizers() 361 362atexit.register(_exit_function) 363 364# 365# Some fork aware types 366# 367 368class ForkAwareThreadLock(object): 369 def __init__(self): 370 self._lock = threading.Lock() 371 self.acquire = self._lock.acquire 372 self.release = self._lock.release 373 register_after_fork(self, ForkAwareThreadLock._at_fork_reinit) 374 375 def _at_fork_reinit(self): 376 self._lock._at_fork_reinit() 377 378 def __enter__(self): 379 return self._lock.__enter__() 380 381 def __exit__(self, *args): 382 return self._lock.__exit__(*args) 383 384 385class ForkAwareLocal(threading.local): 386 def __init__(self): 387 register_after_fork(self, lambda obj : obj.__dict__.clear()) 388 def __reduce__(self): 389 return type(self), () 390 391# 392# Close fds except those specified 393# 394 395try: 396 MAXFD = os.sysconf("SC_OPEN_MAX") 397except Exception: 398 MAXFD = 256 399 400def close_all_fds_except(fds): 401 fds = list(fds) + [-1, MAXFD] 402 fds.sort() 403 assert fds[-1] == MAXFD, 'fd too large' 404 for i in range(len(fds) - 1): 405 os.closerange(fds[i]+1, fds[i+1]) 406# 407# Close sys.stdin and replace stdin with os.devnull 408# 409 410def _close_stdin(): 411 if sys.stdin is None: 412 return 413 414 try: 415 sys.stdin.close() 416 except (OSError, ValueError): 417 pass 418 419 try: 420 fd = os.open(os.devnull, os.O_RDONLY) 421 try: 422 sys.stdin = open(fd, closefd=False) 423 except: 424 os.close(fd) 425 raise 426 except (OSError, ValueError): 427 pass 428 429# 430# Flush standard streams, if any 431# 432 433def _flush_std_streams(): 434 try: 435 sys.stdout.flush() 436 except (AttributeError, ValueError): 437 pass 438 try: 439 sys.stderr.flush() 440 except (AttributeError, ValueError): 441 pass 442 443# 444# Start a program with only specified fds kept open 445# 446 447def spawnv_passfds(path, args, passfds): 448 import _posixsubprocess 449 passfds = tuple(sorted(map(int, passfds))) 450 errpipe_read, errpipe_write = os.pipe() 451 try: 452 return _posixsubprocess.fork_exec( 453 args, [os.fsencode(path)], True, passfds, None, None, 454 -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write, 455 False, False, None, None, None, -1, None) 456 finally: 457 os.close(errpipe_read) 458 os.close(errpipe_write) 459 460 461def close_fds(*fds): 462 """Close each file descriptor given as an argument""" 463 for fd in fds: 464 os.close(fd) 465 466 467def _cleanup_tests(): 468 """Cleanup multiprocessing resources when multiprocessing tests 469 completed.""" 470 471 from test import support 472 473 # cleanup multiprocessing 474 process._cleanup() 475 476 # Stop the ForkServer process if it's running 477 from multiprocessing import forkserver 478 forkserver._forkserver._stop() 479 480 # Stop the ResourceTracker process if it's running 481 from multiprocessing import resource_tracker 482 resource_tracker._resource_tracker._stop() 483 484 # bpo-37421: Explicitly call _run_finalizers() to remove immediately 485 # temporary directories created by multiprocessing.util.get_temp_dir(). 486 _run_finalizers() 487 support.gc_collect() 488 489 support.reap_children() 490