• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1###############################################################################
2# Server process to keep track of unlinked resources (like shared memory
3# segments, semaphores etc.) and clean them.
4#
5# On Unix we run a server process which keeps track of unlinked
6# resources. The server ignores SIGINT and SIGTERM and reads from a
7# pipe.  Every other process of the program has a copy of the writable
8# end of the pipe, so we get EOF when all other processes have exited.
9# Then the server process unlinks any remaining resource names.
10#
11# This is important because there may be system limits for such resources: for
12# instance, the system only supports a limited number of named semaphores, and
13# shared-memory segments live in the RAM. If a python process leaks such a
14# resource, this resource will not be removed till the next reboot.  Without
15# this resource tracker process, "killall python" would probably leave unlinked
16# resources.
17
18import os
19import signal
20import sys
21import threading
22import warnings
23
24from . import spawn
25from . import util
26
27__all__ = ['ensure_running', 'register', 'unregister']
28
29_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
30_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
31
32_CLEANUP_FUNCS = {
33    'noop': lambda: None,
34}
35
36if os.name == 'posix':
37    import _multiprocessing
38    import _posixshmem
39
40    _CLEANUP_FUNCS.update({
41        'semaphore': _multiprocessing.sem_unlink,
42        'shared_memory': _posixshmem.shm_unlink,
43    })
44
45
46class ResourceTracker(object):
47
48    def __init__(self):
49        self._lock = threading.Lock()
50        self._fd = None
51        self._pid = None
52
53    def _stop(self):
54        with self._lock:
55            if self._fd is None:
56                # not running
57                return
58
59            # closing the "alive" file descriptor stops main()
60            os.close(self._fd)
61            self._fd = None
62
63            os.waitpid(self._pid, 0)
64            self._pid = None
65
66    def getfd(self):
67        self.ensure_running()
68        return self._fd
69
70    def ensure_running(self):
71        '''Make sure that resource tracker process is running.
72
73        This can be run from any process.  Usually a child process will use
74        the resource created by its parent.'''
75        with self._lock:
76            if self._fd is not None:
77                # resource tracker was launched before, is it still running?
78                if self._check_alive():
79                    # => still alive
80                    return
81                # => dead, launch it again
82                os.close(self._fd)
83
84                # Clean-up to avoid dangling processes.
85                try:
86                    # _pid can be None if this process is a child from another
87                    # python process, which has started the resource_tracker.
88                    if self._pid is not None:
89                        os.waitpid(self._pid, 0)
90                except ChildProcessError:
91                    # The resource_tracker has already been terminated.
92                    pass
93                self._fd = None
94                self._pid = None
95
96                warnings.warn('resource_tracker: process died unexpectedly, '
97                              'relaunching.  Some resources might leak.')
98
99            fds_to_pass = []
100            try:
101                fds_to_pass.append(sys.stderr.fileno())
102            except Exception:
103                pass
104            cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
105            r, w = os.pipe()
106            try:
107                fds_to_pass.append(r)
108                # process will out live us, so no need to wait on pid
109                exe = spawn.get_executable()
110                args = [exe] + util._args_from_interpreter_flags()
111                args += ['-c', cmd % r]
112                # bpo-33613: Register a signal mask that will block the signals.
113                # This signal mask will be inherited by the child that is going
114                # to be spawned and will protect the child from a race condition
115                # that can make the child die before it registers signal handlers
116                # for SIGINT and SIGTERM. The mask is unregistered after spawning
117                # the child.
118                try:
119                    if _HAVE_SIGMASK:
120                        signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
121                    pid = util.spawnv_passfds(exe, args, fds_to_pass)
122                finally:
123                    if _HAVE_SIGMASK:
124                        signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
125            except:
126                os.close(w)
127                raise
128            else:
129                self._fd = w
130                self._pid = pid
131            finally:
132                os.close(r)
133
134    def _check_alive(self):
135        '''Check that the pipe has not been closed by sending a probe.'''
136        try:
137            # We cannot use send here as it calls ensure_running, creating
138            # a cycle.
139            os.write(self._fd, b'PROBE:0:noop\n')
140        except OSError:
141            return False
142        else:
143            return True
144
145    def register(self, name, rtype):
146        '''Register name of resource with resource tracker.'''
147        self._send('REGISTER', name, rtype)
148
149    def unregister(self, name, rtype):
150        '''Unregister name of resource with resource tracker.'''
151        self._send('UNREGISTER', name, rtype)
152
153    def _send(self, cmd, name, rtype):
154        self.ensure_running()
155        msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
156        if len(name) > 512:
157            # posix guarantees that writes to a pipe of less than PIPE_BUF
158            # bytes are atomic, and that PIPE_BUF >= 512
159            raise ValueError('name too long')
160        nbytes = os.write(self._fd, msg)
161        assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
162            nbytes, len(msg))
163
164
165_resource_tracker = ResourceTracker()
166ensure_running = _resource_tracker.ensure_running
167register = _resource_tracker.register
168unregister = _resource_tracker.unregister
169getfd = _resource_tracker.getfd
170
171def main(fd):
172    '''Run resource tracker.'''
173    # protect the process from ^C and "killall python" etc
174    signal.signal(signal.SIGINT, signal.SIG_IGN)
175    signal.signal(signal.SIGTERM, signal.SIG_IGN)
176    if _HAVE_SIGMASK:
177        signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
178
179    for f in (sys.stdin, sys.stdout):
180        try:
181            f.close()
182        except Exception:
183            pass
184
185    cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
186    try:
187        # keep track of registered/unregistered resources
188        with open(fd, 'rb') as f:
189            for line in f:
190                try:
191                    cmd, name, rtype = line.strip().decode('ascii').split(':')
192                    cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
193                    if cleanup_func is None:
194                        raise ValueError(
195                            f'Cannot register {name} for automatic cleanup: '
196                            f'unknown resource type {rtype}')
197
198                    if cmd == 'REGISTER':
199                        cache[rtype].add(name)
200                    elif cmd == 'UNREGISTER':
201                        cache[rtype].remove(name)
202                    elif cmd == 'PROBE':
203                        pass
204                    else:
205                        raise RuntimeError('unrecognized command %r' % cmd)
206                except Exception:
207                    try:
208                        sys.excepthook(*sys.exc_info())
209                    except:
210                        pass
211    finally:
212        # all processes have terminated; cleanup any remaining resources
213        for rtype, rtype_cache in cache.items():
214            if rtype_cache:
215                try:
216                    warnings.warn('resource_tracker: There appear to be %d '
217                                  'leaked %s objects to clean up at shutdown' %
218                                  (len(rtype_cache), rtype))
219                except Exception:
220                    pass
221            for name in rtype_cache:
222                # For some reason the process which created and registered this
223                # resource has failed to unregister it. Presumably it has
224                # died.  We therefore unlink it.
225                try:
226                    try:
227                        _CLEANUP_FUNCS[rtype](name)
228                    except Exception as e:
229                        warnings.warn('resource_tracker: %r: %s' % (name, e))
230                finally:
231                    pass
232