1# 2# We use a background thread for sharing fds on Unix, and for sharing sockets on 3# Windows. 4# 5# A client which wants to pickle a resource registers it with the resource 6# sharer and gets an identifier in return. The unpickling process will connect 7# to the resource sharer, sends the identifier and its pid, and then receives 8# the resource. 9# 10 11import os 12import signal 13import socket 14import sys 15import threading 16 17from . import process 18from .context import reduction 19from . import util 20 21__all__ = ['stop'] 22 23 24if sys.platform == 'win32': 25 __all__ += ['DupSocket'] 26 27 class DupSocket(object): 28 '''Picklable wrapper for a socket.''' 29 def __init__(self, sock): 30 new_sock = sock.dup() 31 def send(conn, pid): 32 share = new_sock.share(pid) 33 conn.send_bytes(share) 34 self._id = _resource_sharer.register(send, new_sock.close) 35 36 def detach(self): 37 '''Get the socket. This should only be called once.''' 38 with _resource_sharer.get_connection(self._id) as conn: 39 share = conn.recv_bytes() 40 return socket.fromshare(share) 41 42else: 43 __all__ += ['DupFd'] 44 45 class DupFd(object): 46 '''Wrapper for fd which can be used at any time.''' 47 def __init__(self, fd): 48 new_fd = os.dup(fd) 49 def send(conn, pid): 50 reduction.send_handle(conn, new_fd, pid) 51 def close(): 52 os.close(new_fd) 53 self._id = _resource_sharer.register(send, close) 54 55 def detach(self): 56 '''Get the fd. This should only be called once.''' 57 with _resource_sharer.get_connection(self._id) as conn: 58 return reduction.recv_handle(conn) 59 60 61class _ResourceSharer(object): 62 '''Manager for resouces using background thread.''' 63 def __init__(self): 64 self._key = 0 65 self._cache = {} 66 self._old_locks = [] 67 self._lock = threading.Lock() 68 self._listener = None 69 self._address = None 70 self._thread = None 71 util.register_after_fork(self, _ResourceSharer._afterfork) 72 73 def register(self, send, close): 74 '''Register resource, returning an identifier.''' 75 with self._lock: 76 if self._address is None: 77 self._start() 78 self._key += 1 79 self._cache[self._key] = (send, close) 80 return (self._address, self._key) 81 82 @staticmethod 83 def get_connection(ident): 84 '''Return connection from which to receive identified resource.''' 85 from .connection import Client 86 address, key = ident 87 c = Client(address, authkey=process.current_process().authkey) 88 c.send((key, os.getpid())) 89 return c 90 91 def stop(self, timeout=None): 92 '''Stop the background thread and clear registered resources.''' 93 from .connection import Client 94 with self._lock: 95 if self._address is not None: 96 c = Client(self._address, 97 authkey=process.current_process().authkey) 98 c.send(None) 99 c.close() 100 self._thread.join(timeout) 101 if self._thread.is_alive(): 102 util.sub_warning('_ResourceSharer thread did ' 103 'not stop when asked') 104 self._listener.close() 105 self._thread = None 106 self._address = None 107 self._listener = None 108 for key, (send, close) in self._cache.items(): 109 close() 110 self._cache.clear() 111 112 def _afterfork(self): 113 for key, (send, close) in self._cache.items(): 114 close() 115 self._cache.clear() 116 # If self._lock was locked at the time of the fork, it may be broken 117 # -- see issue 6721. Replace it without letting it be gc'ed. 118 self._old_locks.append(self._lock) 119 self._lock = threading.Lock() 120 if self._listener is not None: 121 self._listener.close() 122 self._listener = None 123 self._address = None 124 self._thread = None 125 126 def _start(self): 127 from .connection import Listener 128 assert self._listener is None 129 util.debug('starting listener and thread for sending handles') 130 self._listener = Listener(authkey=process.current_process().authkey) 131 self._address = self._listener.address 132 t = threading.Thread(target=self._serve) 133 t.daemon = True 134 t.start() 135 self._thread = t 136 137 def _serve(self): 138 if hasattr(signal, 'pthread_sigmask'): 139 signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) 140 while 1: 141 try: 142 with self._listener.accept() as conn: 143 msg = conn.recv() 144 if msg is None: 145 break 146 key, destination_pid = msg 147 send, close = self._cache.pop(key) 148 try: 149 send(conn, destination_pid) 150 finally: 151 close() 152 except: 153 if not util.is_exiting(): 154 sys.excepthook(*sys.exc_info()) 155 156 157_resource_sharer = _ResourceSharer() 158stop = _resource_sharer.stop 159