1# 2# Module providing various facilities to other parts of the package 3# 4# multiprocessing/util.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10import os 11import itertools 12import sys 13import weakref 14import atexit 15import threading # we want threading to install it's 16 # cleanup function before multiprocessing does 17from subprocess import _args_from_interpreter_flags 18 19from . import process 20 21__all__ = [ 22 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', 23 'log_to_stderr', 'get_temp_dir', 'register_after_fork', 24 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal', 25 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING', 26 ] 27 28# 29# Logging 30# 31 32NOTSET = 0 33SUBDEBUG = 5 34DEBUG = 10 35INFO = 20 36SUBWARNING = 25 37 38LOGGER_NAME = 'multiprocessing' 39DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' 40 41_logger = None 42_log_to_stderr = False 43 44def sub_debug(msg, *args): 45 if _logger: 46 _logger.log(SUBDEBUG, msg, *args, stacklevel=2) 47 48def debug(msg, *args): 49 if _logger: 50 _logger.log(DEBUG, msg, *args, stacklevel=2) 51 52def info(msg, *args): 53 if _logger: 54 _logger.log(INFO, msg, *args, stacklevel=2) 55 56def sub_warning(msg, *args): 57 if _logger: 58 _logger.log(SUBWARNING, msg, *args, stacklevel=2) 59 60def get_logger(): 61 ''' 62 Returns logger used by multiprocessing 63 ''' 64 global _logger 65 import logging 66 67 with logging._lock: 68 if not _logger: 69 70 _logger = logging.getLogger(LOGGER_NAME) 71 _logger.propagate = 0 72 73 # XXX multiprocessing should cleanup before logging 74 if hasattr(atexit, 'unregister'): 75 atexit.unregister(_exit_function) 76 atexit.register(_exit_function) 77 else: 78 atexit._exithandlers.remove((_exit_function, (), {})) 79 atexit._exithandlers.append((_exit_function, (), {})) 80 81 return _logger 82 83def log_to_stderr(level=None): 84 ''' 85 Turn on logging and add a handler which prints to stderr 86 ''' 87 global _log_to_stderr 88 import logging 89 90 logger = get_logger() 91 formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) 92 handler = logging.StreamHandler() 93 handler.setFormatter(formatter) 94 logger.addHandler(handler) 95 96 if level: 97 logger.setLevel(level) 98 _log_to_stderr = True 99 return _logger 100 101 102# Abstract socket support 103 104def _platform_supports_abstract_sockets(): 105 return sys.platform in ("linux", "android") 106 107 108def is_abstract_socket_namespace(address): 109 if not address: 110 return False 111 if isinstance(address, bytes): 112 return address[0] == 0 113 elif isinstance(address, str): 114 return address[0] == "\0" 115 raise TypeError(f'address type of {address!r} unrecognized') 116 117 118abstract_sockets_supported = _platform_supports_abstract_sockets() 119 120# 121# Function returning a temp directory which will be removed on exit 122# 123 124def _remove_temp_dir(rmtree, tempdir): 125 rmtree(tempdir) 126 127 current_process = process.current_process() 128 # current_process() can be None if the finalizer is called 129 # late during Python finalization 130 if current_process is not None: 131 current_process._config['tempdir'] = None 132 133def get_temp_dir(): 134 # get name of a temp directory which will be automatically cleaned up 135 tempdir = process.current_process()._config.get('tempdir') 136 if tempdir is None: 137 import shutil, tempfile 138 tempdir = tempfile.mkdtemp(prefix='pymp-') 139 info('created temp directory %s', tempdir) 140 # keep a strong reference to shutil.rmtree(), since the finalizer 141 # can be called late during Python shutdown 142 Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir), 143 exitpriority=-100) 144 process.current_process()._config['tempdir'] = tempdir 145 return tempdir 146 147# 148# Support for reinitialization of objects when bootstrapping a child process 149# 150 151_afterfork_registry = weakref.WeakValueDictionary() 152_afterfork_counter = itertools.count() 153 154def _run_after_forkers(): 155 items = list(_afterfork_registry.items()) 156 items.sort() 157 for (index, ident, func), obj in items: 158 try: 159 func(obj) 160 except Exception as e: 161 info('after forker raised exception %s', e) 162 163def register_after_fork(obj, func): 164 _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj 165 166# 167# Finalization using weakrefs 168# 169 170_finalizer_registry = {} 171_finalizer_counter = itertools.count() 172 173 174class Finalize(object): 175 ''' 176 Class which supports object finalization using weakrefs 177 ''' 178 def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): 179 if (exitpriority is not None) and not isinstance(exitpriority,int): 180 raise TypeError( 181 "Exitpriority ({0!r}) must be None or int, not {1!s}".format( 182 exitpriority, type(exitpriority))) 183 184 if obj is not None: 185 self._weakref = weakref.ref(obj, self) 186 elif exitpriority is None: 187 raise ValueError("Without object, exitpriority cannot be None") 188 189 self._callback = callback 190 self._args = args 191 self._kwargs = kwargs or {} 192 self._key = (exitpriority, next(_finalizer_counter)) 193 self._pid = os.getpid() 194 195 _finalizer_registry[self._key] = self 196 197 def __call__(self, wr=None, 198 # Need to bind these locally because the globals can have 199 # been cleared at shutdown 200 _finalizer_registry=_finalizer_registry, 201 sub_debug=sub_debug, getpid=os.getpid): 202 ''' 203 Run the callback unless it has already been called or cancelled 204 ''' 205 try: 206 del _finalizer_registry[self._key] 207 except KeyError: 208 sub_debug('finalizer no longer registered') 209 else: 210 if self._pid != getpid(): 211 sub_debug('finalizer ignored because different process') 212 res = None 213 else: 214 sub_debug('finalizer calling %s with args %s and kwargs %s', 215 self._callback, self._args, self._kwargs) 216 res = self._callback(*self._args, **self._kwargs) 217 self._weakref = self._callback = self._args = \ 218 self._kwargs = self._key = None 219 return res 220 221 def cancel(self): 222 ''' 223 Cancel finalization of the object 224 ''' 225 try: 226 del _finalizer_registry[self._key] 227 except KeyError: 228 pass 229 else: 230 self._weakref = self._callback = self._args = \ 231 self._kwargs = self._key = None 232 233 def still_active(self): 234 ''' 235 Return whether this finalizer is still waiting to invoke callback 236 ''' 237 return self._key in _finalizer_registry 238 239 def __repr__(self): 240 try: 241 obj = self._weakref() 242 except (AttributeError, TypeError): 243 obj = None 244 245 if obj is None: 246 return '<%s object, dead>' % self.__class__.__name__ 247 248 x = '<%s object, callback=%s' % ( 249 self.__class__.__name__, 250 getattr(self._callback, '__name__', self._callback)) 251 if self._args: 252 x += ', args=' + str(self._args) 253 if self._kwargs: 254 x += ', kwargs=' + str(self._kwargs) 255 if self._key[0] is not None: 256 x += ', exitpriority=' + str(self._key[0]) 257 return x + '>' 258 259 260def _run_finalizers(minpriority=None): 261 ''' 262 Run all finalizers whose exit priority is not None and at least minpriority 263 264 Finalizers with highest priority are called first; finalizers with 265 the same priority will be called in reverse order of creation. 266 ''' 267 if _finalizer_registry is None: 268 # This function may be called after this module's globals are 269 # destroyed. See the _exit_function function in this module for more 270 # notes. 271 return 272 273 if minpriority is None: 274 f = lambda p : p[0] is not None 275 else: 276 f = lambda p : p[0] is not None and p[0] >= minpriority 277 278 # Careful: _finalizer_registry may be mutated while this function 279 # is running (either by a GC run or by another thread). 280 281 # list(_finalizer_registry) should be atomic, while 282 # list(_finalizer_registry.items()) is not. 283 keys = [key for key in list(_finalizer_registry) if f(key)] 284 keys.sort(reverse=True) 285 286 for key in keys: 287 finalizer = _finalizer_registry.get(key) 288 # key may have been removed from the registry 289 if finalizer is not None: 290 sub_debug('calling %s', finalizer) 291 try: 292 finalizer() 293 except Exception: 294 import traceback 295 traceback.print_exc() 296 297 if minpriority is None: 298 _finalizer_registry.clear() 299 300# 301# Clean up on exit 302# 303 304def is_exiting(): 305 ''' 306 Returns true if the process is shutting down 307 ''' 308 return _exiting or _exiting is None 309 310_exiting = False 311 312def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, 313 active_children=process.active_children, 314 current_process=process.current_process): 315 # We hold on to references to functions in the arglist due to the 316 # situation described below, where this function is called after this 317 # module's globals are destroyed. 318 319 global _exiting 320 321 if not _exiting: 322 _exiting = True 323 324 info('process shutting down') 325 debug('running all "atexit" finalizers with priority >= 0') 326 _run_finalizers(0) 327 328 if current_process() is not None: 329 # We check if the current process is None here because if 330 # it's None, any call to ``active_children()`` will raise 331 # an AttributeError (active_children winds up trying to 332 # get attributes from util._current_process). One 333 # situation where this can happen is if someone has 334 # manipulated sys.modules, causing this module to be 335 # garbage collected. The destructor for the module type 336 # then replaces all values in the module dict with None. 337 # For instance, after setuptools runs a test it replaces 338 # sys.modules with a copy created earlier. See issues 339 # #9775 and #15881. Also related: #4106, #9205, and 340 # #9207. 341 342 for p in active_children(): 343 if p.daemon: 344 info('calling terminate() for daemon %s', p.name) 345 p._popen.terminate() 346 347 for p in active_children(): 348 info('calling join() for process %s', p.name) 349 p.join() 350 351 debug('running the remaining "atexit" finalizers') 352 _run_finalizers() 353 354atexit.register(_exit_function) 355 356# 357# Some fork aware types 358# 359 360class ForkAwareThreadLock(object): 361 def __init__(self): 362 self._lock = threading.Lock() 363 self.acquire = self._lock.acquire 364 self.release = self._lock.release 365 register_after_fork(self, ForkAwareThreadLock._at_fork_reinit) 366 367 def _at_fork_reinit(self): 368 self._lock._at_fork_reinit() 369 370 def __enter__(self): 371 return self._lock.__enter__() 372 373 def __exit__(self, *args): 374 return self._lock.__exit__(*args) 375 376 377class ForkAwareLocal(threading.local): 378 def __init__(self): 379 register_after_fork(self, lambda obj : obj.__dict__.clear()) 380 def __reduce__(self): 381 return type(self), () 382 383# 384# Close fds except those specified 385# 386 387try: 388 MAXFD = os.sysconf("SC_OPEN_MAX") 389except Exception: 390 MAXFD = 256 391 392def close_all_fds_except(fds): 393 fds = list(fds) + [-1, MAXFD] 394 fds.sort() 395 assert fds[-1] == MAXFD, 'fd too large' 396 for i in range(len(fds) - 1): 397 os.closerange(fds[i]+1, fds[i+1]) 398# 399# Close sys.stdin and replace stdin with os.devnull 400# 401 402def _close_stdin(): 403 if sys.stdin is None: 404 return 405 406 try: 407 sys.stdin.close() 408 except (OSError, ValueError): 409 pass 410 411 try: 412 fd = os.open(os.devnull, os.O_RDONLY) 413 try: 414 sys.stdin = open(fd, encoding="utf-8", closefd=False) 415 except: 416 os.close(fd) 417 raise 418 except (OSError, ValueError): 419 pass 420 421# 422# Flush standard streams, if any 423# 424 425def _flush_std_streams(): 426 try: 427 sys.stdout.flush() 428 except (AttributeError, ValueError): 429 pass 430 try: 431 sys.stderr.flush() 432 except (AttributeError, ValueError): 433 pass 434 435# 436# Start a program with only specified fds kept open 437# 438 439def spawnv_passfds(path, args, passfds): 440 import _posixsubprocess 441 import subprocess 442 passfds = tuple(sorted(map(int, passfds))) 443 errpipe_read, errpipe_write = os.pipe() 444 try: 445 return _posixsubprocess.fork_exec( 446 args, [path], True, passfds, None, None, 447 -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write, 448 False, False, -1, None, None, None, -1, None, 449 subprocess._USE_VFORK) 450 finally: 451 os.close(errpipe_read) 452 os.close(errpipe_write) 453 454 455def close_fds(*fds): 456 """Close each file descriptor given as an argument""" 457 for fd in fds: 458 os.close(fd) 459 460 461def _cleanup_tests(): 462 """Cleanup multiprocessing resources when multiprocessing tests 463 completed.""" 464 465 from test import support 466 467 # cleanup multiprocessing 468 process._cleanup() 469 470 # Stop the ForkServer process if it's running 471 from multiprocessing import forkserver 472 forkserver._forkserver._stop() 473 474 # Stop the ResourceTracker process if it's running 475 from multiprocessing import resource_tracker 476 resource_tracker._resource_tracker._stop() 477 478 # bpo-37421: Explicitly call _run_finalizers() to remove immediately 479 # temporary directories created by multiprocessing.util.get_temp_dir(). 480 _run_finalizers() 481 support.gc_collect() 482 483 support.reap_children() 484