• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1###############################################################################
2# Server process to keep track of unlinked resources (like shared memory
3# segments, semaphores etc.) and clean them.
4#
5# On Unix we run a server process which keeps track of unlinked
6# resources. The server ignores SIGINT and SIGTERM and reads from a
7# pipe.  Every other process of the program has a copy of the writable
8# end of the pipe, so we get EOF when all other processes have exited.
9# Then the server process unlinks any remaining resource names.
10#
11# This is important because there may be system limits for such resources: for
12# instance, the system only supports a limited number of named semaphores, and
13# shared-memory segments live in the RAM. If a python process leaks such a
14# resource, this resource will not be removed till the next reboot.  Without
15# this resource tracker process, "killall python" would probably leave unlinked
16# resources.
17
18import os
19import signal
20import sys
21import threading
22import warnings
23
24from . import spawn
25from . import util
26
27__all__ = ['ensure_running', 'register', 'unregister']
28
29_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
30_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
31
32_CLEANUP_FUNCS = {
33    'noop': lambda: None,
34}
35
36if os.name == 'posix':
37    import _multiprocessing
38    import _posixshmem
39
40    # Use sem_unlink() to clean up named semaphores.
41    #
42    # sem_unlink() may be missing if the Python build process detected the
43    # absence of POSIX named semaphores. In that case, no named semaphores were
44    # ever opened, so no cleanup would be necessary.
45    if hasattr(_multiprocessing, 'sem_unlink'):
46        _CLEANUP_FUNCS.update({
47            'semaphore': _multiprocessing.sem_unlink,
48        })
49    _CLEANUP_FUNCS.update({
50        'shared_memory': _posixshmem.shm_unlink,
51    })
52
53
54class ResourceTracker(object):
55
56    def __init__(self):
57        self._lock = threading.Lock()
58        self._fd = None
59        self._pid = None
60
61    def _stop(self):
62        with self._lock:
63            if self._fd is None:
64                # not running
65                return
66
67            # closing the "alive" file descriptor stops main()
68            os.close(self._fd)
69            self._fd = None
70
71            os.waitpid(self._pid, 0)
72            self._pid = None
73
74    def getfd(self):
75        self.ensure_running()
76        return self._fd
77
78    def ensure_running(self):
79        '''Make sure that resource tracker process is running.
80
81        This can be run from any process.  Usually a child process will use
82        the resource created by its parent.'''
83        with self._lock:
84            if self._fd is not None:
85                # resource tracker was launched before, is it still running?
86                if self._check_alive():
87                    # => still alive
88                    return
89                # => dead, launch it again
90                os.close(self._fd)
91
92                # Clean-up to avoid dangling processes.
93                try:
94                    # _pid can be None if this process is a child from another
95                    # python process, which has started the resource_tracker.
96                    if self._pid is not None:
97                        os.waitpid(self._pid, 0)
98                except ChildProcessError:
99                    # The resource_tracker has already been terminated.
100                    pass
101                self._fd = None
102                self._pid = None
103
104                warnings.warn('resource_tracker: process died unexpectedly, '
105                              'relaunching.  Some resources might leak.')
106
107            fds_to_pass = []
108            try:
109                fds_to_pass.append(sys.stderr.fileno())
110            except Exception:
111                pass
112            cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
113            r, w = os.pipe()
114            try:
115                fds_to_pass.append(r)
116                # process will out live us, so no need to wait on pid
117                exe = spawn.get_executable()
118                args = [exe] + util._args_from_interpreter_flags()
119                args += ['-c', cmd % r]
120                # bpo-33613: Register a signal mask that will block the signals.
121                # This signal mask will be inherited by the child that is going
122                # to be spawned and will protect the child from a race condition
123                # that can make the child die before it registers signal handlers
124                # for SIGINT and SIGTERM. The mask is unregistered after spawning
125                # the child.
126                try:
127                    if _HAVE_SIGMASK:
128                        signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
129                    pid = util.spawnv_passfds(exe, args, fds_to_pass)
130                finally:
131                    if _HAVE_SIGMASK:
132                        signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
133            except:
134                os.close(w)
135                raise
136            else:
137                self._fd = w
138                self._pid = pid
139            finally:
140                os.close(r)
141
142    def _check_alive(self):
143        '''Check that the pipe has not been closed by sending a probe.'''
144        try:
145            # We cannot use send here as it calls ensure_running, creating
146            # a cycle.
147            os.write(self._fd, b'PROBE:0:noop\n')
148        except OSError:
149            return False
150        else:
151            return True
152
153    def register(self, name, rtype):
154        '''Register name of resource with resource tracker.'''
155        self._send('REGISTER', name, rtype)
156
157    def unregister(self, name, rtype):
158        '''Unregister name of resource with resource tracker.'''
159        self._send('UNREGISTER', name, rtype)
160
161    def _send(self, cmd, name, rtype):
162        self.ensure_running()
163        msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
164        if len(name) > 512:
165            # posix guarantees that writes to a pipe of less than PIPE_BUF
166            # bytes are atomic, and that PIPE_BUF >= 512
167            raise ValueError('name too long')
168        nbytes = os.write(self._fd, msg)
169        assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
170            nbytes, len(msg))
171
172
173_resource_tracker = ResourceTracker()
174ensure_running = _resource_tracker.ensure_running
175register = _resource_tracker.register
176unregister = _resource_tracker.unregister
177getfd = _resource_tracker.getfd
178
179def main(fd):
180    '''Run resource tracker.'''
181    # protect the process from ^C and "killall python" etc
182    signal.signal(signal.SIGINT, signal.SIG_IGN)
183    signal.signal(signal.SIGTERM, signal.SIG_IGN)
184    if _HAVE_SIGMASK:
185        signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
186
187    for f in (sys.stdin, sys.stdout):
188        try:
189            f.close()
190        except Exception:
191            pass
192
193    cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
194    try:
195        # keep track of registered/unregistered resources
196        with open(fd, 'rb') as f:
197            for line in f:
198                try:
199                    cmd, name, rtype = line.strip().decode('ascii').split(':')
200                    cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
201                    if cleanup_func is None:
202                        raise ValueError(
203                            f'Cannot register {name} for automatic cleanup: '
204                            f'unknown resource type {rtype}')
205
206                    if cmd == 'REGISTER':
207                        cache[rtype].add(name)
208                    elif cmd == 'UNREGISTER':
209                        cache[rtype].remove(name)
210                    elif cmd == 'PROBE':
211                        pass
212                    else:
213                        raise RuntimeError('unrecognized command %r' % cmd)
214                except Exception:
215                    try:
216                        sys.excepthook(*sys.exc_info())
217                    except:
218                        pass
219    finally:
220        # all processes have terminated; cleanup any remaining resources
221        for rtype, rtype_cache in cache.items():
222            if rtype_cache:
223                try:
224                    warnings.warn('resource_tracker: There appear to be %d '
225                                  'leaked %s objects to clean up at shutdown' %
226                                  (len(rtype_cache), rtype))
227                except Exception:
228                    pass
229            for name in rtype_cache:
230                # For some reason the process which created and registered this
231                # resource has failed to unregister it. Presumably it has
232                # died.  We therefore unlink it.
233                try:
234                    try:
235                        _CLEANUP_FUNCS[rtype](name)
236                    except Exception as e:
237                        warnings.warn('resource_tracker: %r: %s' % (name, e))
238                finally:
239                    pass
240