1############################################################################### 2# Server process to keep track of unlinked resources (like shared memory 3# segments, semaphores etc.) and clean them. 4# 5# On Unix we run a server process which keeps track of unlinked 6# resources. The server ignores SIGINT and SIGTERM and reads from a 7# pipe. Every other process of the program has a copy of the writable 8# end of the pipe, so we get EOF when all other processes have exited. 9# Then the server process unlinks any remaining resource names. 10# 11# This is important because there may be system limits for such resources: for 12# instance, the system only supports a limited number of named semaphores, and 13# shared-memory segments live in the RAM. If a python process leaks such a 14# resource, this resource will not be removed till the next reboot. Without 15# this resource tracker process, "killall python" would probably leave unlinked 16# resources. 17 18import os 19import signal 20import sys 21import threading 22import warnings 23 24from . import spawn 25from . import util 26 27__all__ = ['ensure_running', 'register', 'unregister'] 28 29_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask') 30_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) 31 32_CLEANUP_FUNCS = { 33 'noop': lambda: None, 34} 35 36if os.name == 'posix': 37 import _multiprocessing 38 import _posixshmem 39 40 _CLEANUP_FUNCS.update({ 41 'semaphore': _multiprocessing.sem_unlink, 42 'shared_memory': _posixshmem.shm_unlink, 43 }) 44 45 46class ResourceTracker(object): 47 48 def __init__(self): 49 self._lock = threading.Lock() 50 self._fd = None 51 self._pid = None 52 53 def _stop(self): 54 with self._lock: 55 if self._fd is None: 56 # not running 57 return 58 59 # closing the "alive" file descriptor stops main() 60 os.close(self._fd) 61 self._fd = None 62 63 os.waitpid(self._pid, 0) 64 self._pid = None 65 66 def getfd(self): 67 self.ensure_running() 68 return self._fd 69 70 def ensure_running(self): 71 '''Make sure that resource tracker process is running. 72 73 This can be run from any process. Usually a child process will use 74 the resource created by its parent.''' 75 with self._lock: 76 if self._fd is not None: 77 # resource tracker was launched before, is it still running? 78 if self._check_alive(): 79 # => still alive 80 return 81 # => dead, launch it again 82 os.close(self._fd) 83 84 # Clean-up to avoid dangling processes. 85 try: 86 # _pid can be None if this process is a child from another 87 # python process, which has started the resource_tracker. 88 if self._pid is not None: 89 os.waitpid(self._pid, 0) 90 except ChildProcessError: 91 # The resource_tracker has already been terminated. 92 pass 93 self._fd = None 94 self._pid = None 95 96 warnings.warn('resource_tracker: process died unexpectedly, ' 97 'relaunching. Some resources might leak.') 98 99 fds_to_pass = [] 100 try: 101 fds_to_pass.append(sys.stderr.fileno()) 102 except Exception: 103 pass 104 cmd = 'from multiprocessing.resource_tracker import main;main(%d)' 105 r, w = os.pipe() 106 try: 107 fds_to_pass.append(r) 108 # process will out live us, so no need to wait on pid 109 exe = spawn.get_executable() 110 args = [exe] + util._args_from_interpreter_flags() 111 args += ['-c', cmd % r] 112 # bpo-33613: Register a signal mask that will block the signals. 113 # This signal mask will be inherited by the child that is going 114 # to be spawned and will protect the child from a race condition 115 # that can make the child die before it registers signal handlers 116 # for SIGINT and SIGTERM. The mask is unregistered after spawning 117 # the child. 118 try: 119 if _HAVE_SIGMASK: 120 signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS) 121 pid = util.spawnv_passfds(exe, args, fds_to_pass) 122 finally: 123 if _HAVE_SIGMASK: 124 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) 125 except: 126 os.close(w) 127 raise 128 else: 129 self._fd = w 130 self._pid = pid 131 finally: 132 os.close(r) 133 134 def _check_alive(self): 135 '''Check that the pipe has not been closed by sending a probe.''' 136 try: 137 # We cannot use send here as it calls ensure_running, creating 138 # a cycle. 139 os.write(self._fd, b'PROBE:0:noop\n') 140 except OSError: 141 return False 142 else: 143 return True 144 145 def register(self, name, rtype): 146 '''Register name of resource with resource tracker.''' 147 self._send('REGISTER', name, rtype) 148 149 def unregister(self, name, rtype): 150 '''Unregister name of resource with resource tracker.''' 151 self._send('UNREGISTER', name, rtype) 152 153 def _send(self, cmd, name, rtype): 154 self.ensure_running() 155 msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii') 156 if len(name) > 512: 157 # posix guarantees that writes to a pipe of less than PIPE_BUF 158 # bytes are atomic, and that PIPE_BUF >= 512 159 raise ValueError('name too long') 160 nbytes = os.write(self._fd, msg) 161 assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format( 162 nbytes, len(msg)) 163 164 165_resource_tracker = ResourceTracker() 166ensure_running = _resource_tracker.ensure_running 167register = _resource_tracker.register 168unregister = _resource_tracker.unregister 169getfd = _resource_tracker.getfd 170 171def main(fd): 172 '''Run resource tracker.''' 173 # protect the process from ^C and "killall python" etc 174 signal.signal(signal.SIGINT, signal.SIG_IGN) 175 signal.signal(signal.SIGTERM, signal.SIG_IGN) 176 if _HAVE_SIGMASK: 177 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) 178 179 for f in (sys.stdin, sys.stdout): 180 try: 181 f.close() 182 except Exception: 183 pass 184 185 cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()} 186 try: 187 # keep track of registered/unregistered resources 188 with open(fd, 'rb') as f: 189 for line in f: 190 try: 191 cmd, name, rtype = line.strip().decode('ascii').split(':') 192 cleanup_func = _CLEANUP_FUNCS.get(rtype, None) 193 if cleanup_func is None: 194 raise ValueError( 195 f'Cannot register {name} for automatic cleanup: ' 196 f'unknown resource type {rtype}') 197 198 if cmd == 'REGISTER': 199 cache[rtype].add(name) 200 elif cmd == 'UNREGISTER': 201 cache[rtype].remove(name) 202 elif cmd == 'PROBE': 203 pass 204 else: 205 raise RuntimeError('unrecognized command %r' % cmd) 206 except Exception: 207 try: 208 sys.excepthook(*sys.exc_info()) 209 except: 210 pass 211 finally: 212 # all processes have terminated; cleanup any remaining resources 213 for rtype, rtype_cache in cache.items(): 214 if rtype_cache: 215 try: 216 warnings.warn('resource_tracker: There appear to be %d ' 217 'leaked %s objects to clean up at shutdown' % 218 (len(rtype_cache), rtype)) 219 except Exception: 220 pass 221 for name in rtype_cache: 222 # For some reason the process which created and registered this 223 # resource has failed to unregister it. Presumably it has 224 # died. We therefore unlink it. 225 try: 226 try: 227 _CLEANUP_FUNCS[rtype](name) 228 except Exception as e: 229 warnings.warn('resource_tracker: %r: %s' % (name, e)) 230 finally: 231 pass 232