1# 2# On Unix we run a server process which keeps track of unlinked 3# semaphores. The server ignores SIGINT and SIGTERM and reads from a 4# pipe. Every other process of the program has a copy of the writable 5# end of the pipe, so we get EOF when all other processes have exited. 6# Then the server process unlinks any remaining semaphore names. 7# 8# This is important because the system only supports a limited number 9# of named semaphores, and they will not be automatically removed till 10# the next reboot. Without this semaphore tracker process, "killall 11# python" would probably leave unlinked semaphores. 12# 13 14import os 15import signal 16import sys 17import threading 18import warnings 19import _multiprocessing 20 21from . import spawn 22from . import util 23 24__all__ = ['ensure_running', 'register', 'unregister'] 25 26 27class SemaphoreTracker(object): 28 29 def __init__(self): 30 self._lock = threading.Lock() 31 self._fd = None 32 self._pid = None 33 34 def getfd(self): 35 self.ensure_running() 36 return self._fd 37 38 def ensure_running(self): 39 '''Make sure that semaphore tracker process is running. 40 41 This can be run from any process. Usually a child process will use 42 the semaphore created by its parent.''' 43 with self._lock: 44 if self._pid is not None: 45 # semaphore tracker was launched before, is it still running? 46 pid, status = os.waitpid(self._pid, os.WNOHANG) 47 if not pid: 48 # => still alive 49 return 50 # => dead, launch it again 51 os.close(self._fd) 52 self._fd = None 53 self._pid = None 54 55 warnings.warn('semaphore_tracker: process died unexpectedly, ' 56 'relaunching. Some semaphores might leak.') 57 58 fds_to_pass = [] 59 try: 60 fds_to_pass.append(sys.stderr.fileno()) 61 except Exception: 62 pass 63 cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)' 64 r, w = os.pipe() 65 try: 66 fds_to_pass.append(r) 67 # process will out live us, so no need to wait on pid 68 exe = spawn.get_executable() 69 args = [exe] + util._args_from_interpreter_flags() 70 args += ['-c', cmd % r] 71 pid = util.spawnv_passfds(exe, args, fds_to_pass) 72 except: 73 os.close(w) 74 raise 75 else: 76 self._fd = w 77 self._pid = pid 78 finally: 79 os.close(r) 80 81 def register(self, name): 82 '''Register name of semaphore with semaphore tracker.''' 83 self._send('REGISTER', name) 84 85 def unregister(self, name): 86 '''Unregister name of semaphore with semaphore tracker.''' 87 self._send('UNREGISTER', name) 88 89 def _send(self, cmd, name): 90 self.ensure_running() 91 msg = '{0}:{1}\n'.format(cmd, name).encode('ascii') 92 if len(name) > 512: 93 # posix guarantees that writes to a pipe of less than PIPE_BUF 94 # bytes are atomic, and that PIPE_BUF >= 512 95 raise ValueError('name too long') 96 nbytes = os.write(self._fd, msg) 97 assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format( 98 nbytes, len(msg)) 99 100 101_semaphore_tracker = SemaphoreTracker() 102ensure_running = _semaphore_tracker.ensure_running 103register = _semaphore_tracker.register 104unregister = _semaphore_tracker.unregister 105getfd = _semaphore_tracker.getfd 106 107 108def main(fd): 109 '''Run semaphore tracker.''' 110 # protect the process from ^C and "killall python" etc 111 signal.signal(signal.SIGINT, signal.SIG_IGN) 112 signal.signal(signal.SIGTERM, signal.SIG_IGN) 113 114 for f in (sys.stdin, sys.stdout): 115 try: 116 f.close() 117 except Exception: 118 pass 119 120 cache = set() 121 try: 122 # keep track of registered/unregistered semaphores 123 with open(fd, 'rb') as f: 124 for line in f: 125 try: 126 cmd, name = line.strip().split(b':') 127 if cmd == b'REGISTER': 128 cache.add(name) 129 elif cmd == b'UNREGISTER': 130 cache.remove(name) 131 else: 132 raise RuntimeError('unrecognized command %r' % cmd) 133 except Exception: 134 try: 135 sys.excepthook(*sys.exc_info()) 136 except: 137 pass 138 finally: 139 # all processes have terminated; cleanup any remaining semaphores 140 if cache: 141 try: 142 warnings.warn('semaphore_tracker: There appear to be %d ' 143 'leaked semaphores to clean up at shutdown' % 144 len(cache)) 145 except Exception: 146 pass 147 for name in cache: 148 # For some reason the process which created and registered this 149 # semaphore has failed to unregister it. Presumably it has died. 150 # We therefore unlink it. 151 try: 152 name = name.decode('ascii') 153 try: 154 _multiprocessing.sem_unlink(name) 155 except Exception as e: 156 warnings.warn('semaphore_tracker: %r: %s' % (name, e)) 157 finally: 158 pass 159