• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# We use a background thread for sharing fds on Unix, and for sharing sockets on
3# Windows.
4#
5# A client which wants to pickle a resource registers it with the resource
6# sharer and gets an identifier in return.  The unpickling process will connect
7# to the resource sharer, sends the identifier and its pid, and then receives
8# the resource.
9#
10
11import os
12import signal
13import socket
14import sys
15import threading
16
17from . import process
18from .context import reduction
19from . import util
20
21__all__ = ['stop']
22
23
24if sys.platform == 'win32':
25    __all__ += ['DupSocket']
26
27    class DupSocket(object):
28        '''Picklable wrapper for a socket.'''
29        def __init__(self, sock):
30            new_sock = sock.dup()
31            def send(conn, pid):
32                share = new_sock.share(pid)
33                conn.send_bytes(share)
34            self._id = _resource_sharer.register(send, new_sock.close)
35
36        def detach(self):
37            '''Get the socket.  This should only be called once.'''
38            with _resource_sharer.get_connection(self._id) as conn:
39                share = conn.recv_bytes()
40                return socket.fromshare(share)
41
42else:
43    __all__ += ['DupFd']
44
45    class DupFd(object):
46        '''Wrapper for fd which can be used at any time.'''
47        def __init__(self, fd):
48            new_fd = os.dup(fd)
49            def send(conn, pid):
50                reduction.send_handle(conn, new_fd, pid)
51            def close():
52                os.close(new_fd)
53            self._id = _resource_sharer.register(send, close)
54
55        def detach(self):
56            '''Get the fd.  This should only be called once.'''
57            with _resource_sharer.get_connection(self._id) as conn:
58                return reduction.recv_handle(conn)
59
60
61class _ResourceSharer(object):
62    '''Manager for resouces using background thread.'''
63    def __init__(self):
64        self._key = 0
65        self._cache = {}
66        self._old_locks = []
67        self._lock = threading.Lock()
68        self._listener = None
69        self._address = None
70        self._thread = None
71        util.register_after_fork(self, _ResourceSharer._afterfork)
72
73    def register(self, send, close):
74        '''Register resource, returning an identifier.'''
75        with self._lock:
76            if self._address is None:
77                self._start()
78            self._key += 1
79            self._cache[self._key] = (send, close)
80            return (self._address, self._key)
81
82    @staticmethod
83    def get_connection(ident):
84        '''Return connection from which to receive identified resource.'''
85        from .connection import Client
86        address, key = ident
87        c = Client(address, authkey=process.current_process().authkey)
88        c.send((key, os.getpid()))
89        return c
90
91    def stop(self, timeout=None):
92        '''Stop the background thread and clear registered resources.'''
93        from .connection import Client
94        with self._lock:
95            if self._address is not None:
96                c = Client(self._address,
97                           authkey=process.current_process().authkey)
98                c.send(None)
99                c.close()
100                self._thread.join(timeout)
101                if self._thread.is_alive():
102                    util.sub_warning('_ResourceSharer thread did '
103                                     'not stop when asked')
104                self._listener.close()
105                self._thread = None
106                self._address = None
107                self._listener = None
108                for key, (send, close) in self._cache.items():
109                    close()
110                self._cache.clear()
111
112    def _afterfork(self):
113        for key, (send, close) in self._cache.items():
114            close()
115        self._cache.clear()
116        # If self._lock was locked at the time of the fork, it may be broken
117        # -- see issue 6721.  Replace it without letting it be gc'ed.
118        self._old_locks.append(self._lock)
119        self._lock = threading.Lock()
120        if self._listener is not None:
121            self._listener.close()
122        self._listener = None
123        self._address = None
124        self._thread = None
125
126    def _start(self):
127        from .connection import Listener
128        assert self._listener is None
129        util.debug('starting listener and thread for sending handles')
130        self._listener = Listener(authkey=process.current_process().authkey)
131        self._address = self._listener.address
132        t = threading.Thread(target=self._serve)
133        t.daemon = True
134        t.start()
135        self._thread = t
136
137    def _serve(self):
138        if hasattr(signal, 'pthread_sigmask'):
139            signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
140        while 1:
141            try:
142                with self._listener.accept() as conn:
143                    msg = conn.recv()
144                    if msg is None:
145                        break
146                    key, destination_pid = msg
147                    send, close = self._cache.pop(key)
148                    try:
149                        send(conn, destination_pid)
150                    finally:
151                        close()
152            except:
153                if not util.is_exiting():
154                    sys.excepthook(*sys.exc_info())
155
156
157_resource_sharer = _ResourceSharer()
158stop = _resource_sharer.stop
159