• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# On Unix we run a server process which keeps track of unlinked
3# semaphores. The server ignores SIGINT and SIGTERM and reads from a
4# pipe.  Every other process of the program has a copy of the writable
5# end of the pipe, so we get EOF when all other processes have exited.
6# Then the server process unlinks any remaining semaphore names.
7#
8# This is important because the system only supports a limited number
9# of named semaphores, and they will not be automatically removed till
10# the next reboot.  Without this semaphore tracker process, "killall
11# python" would probably leave unlinked semaphores.
12#
13
14import os
15import signal
16import sys
17import threading
18import warnings
19import _multiprocessing
20
21from . import spawn
22from . import util
23
24__all__ = ['ensure_running', 'register', 'unregister']
25
26
27class SemaphoreTracker(object):
28
29    def __init__(self):
30        self._lock = threading.Lock()
31        self._fd = None
32        self._pid = None
33
34    def getfd(self):
35        self.ensure_running()
36        return self._fd
37
38    def ensure_running(self):
39        '''Make sure that semaphore tracker process is running.
40
41        This can be run from any process.  Usually a child process will use
42        the semaphore created by its parent.'''
43        with self._lock:
44            if self._pid is not None:
45                # semaphore tracker was launched before, is it still running?
46                pid, status = os.waitpid(self._pid, os.WNOHANG)
47                if not pid:
48                    # => still alive
49                    return
50                # => dead, launch it again
51                os.close(self._fd)
52                self._fd = None
53                self._pid = None
54
55                warnings.warn('semaphore_tracker: process died unexpectedly, '
56                              'relaunching.  Some semaphores might leak.')
57
58            fds_to_pass = []
59            try:
60                fds_to_pass.append(sys.stderr.fileno())
61            except Exception:
62                pass
63            cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)'
64            r, w = os.pipe()
65            try:
66                fds_to_pass.append(r)
67                # process will out live us, so no need to wait on pid
68                exe = spawn.get_executable()
69                args = [exe] + util._args_from_interpreter_flags()
70                args += ['-c', cmd % r]
71                pid = util.spawnv_passfds(exe, args, fds_to_pass)
72            except:
73                os.close(w)
74                raise
75            else:
76                self._fd = w
77                self._pid = pid
78            finally:
79                os.close(r)
80
81    def register(self, name):
82        '''Register name of semaphore with semaphore tracker.'''
83        self._send('REGISTER', name)
84
85    def unregister(self, name):
86        '''Unregister name of semaphore with semaphore tracker.'''
87        self._send('UNREGISTER', name)
88
89    def _send(self, cmd, name):
90        self.ensure_running()
91        msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
92        if len(name) > 512:
93            # posix guarantees that writes to a pipe of less than PIPE_BUF
94            # bytes are atomic, and that PIPE_BUF >= 512
95            raise ValueError('name too long')
96        nbytes = os.write(self._fd, msg)
97        assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
98            nbytes, len(msg))
99
100
101_semaphore_tracker = SemaphoreTracker()
102ensure_running = _semaphore_tracker.ensure_running
103register = _semaphore_tracker.register
104unregister = _semaphore_tracker.unregister
105getfd = _semaphore_tracker.getfd
106
107
108def main(fd):
109    '''Run semaphore tracker.'''
110    # protect the process from ^C and "killall python" etc
111    signal.signal(signal.SIGINT, signal.SIG_IGN)
112    signal.signal(signal.SIGTERM, signal.SIG_IGN)
113
114    for f in (sys.stdin, sys.stdout):
115        try:
116            f.close()
117        except Exception:
118            pass
119
120    cache = set()
121    try:
122        # keep track of registered/unregistered semaphores
123        with open(fd, 'rb') as f:
124            for line in f:
125                try:
126                    cmd, name = line.strip().split(b':')
127                    if cmd == b'REGISTER':
128                        cache.add(name)
129                    elif cmd == b'UNREGISTER':
130                        cache.remove(name)
131                    else:
132                        raise RuntimeError('unrecognized command %r' % cmd)
133                except Exception:
134                    try:
135                        sys.excepthook(*sys.exc_info())
136                    except:
137                        pass
138    finally:
139        # all processes have terminated; cleanup any remaining semaphores
140        if cache:
141            try:
142                warnings.warn('semaphore_tracker: There appear to be %d '
143                              'leaked semaphores to clean up at shutdown' %
144                              len(cache))
145            except Exception:
146                pass
147        for name in cache:
148            # For some reason the process which created and registered this
149            # semaphore has failed to unregister it. Presumably it has died.
150            # We therefore unlink it.
151            try:
152                name = name.decode('ascii')
153                try:
154                    _multiprocessing.sem_unlink(name)
155                except Exception as e:
156                    warnings.warn('semaphore_tracker: %r: %s' % (name, e))
157            finally:
158                pass
159