1# 2# Module to allow connection and socket objects to be transferred 3# between processes 4# 5# multiprocessing/reduction.py 6# 7# Copyright (c) 2006-2008, R Oudkerk 8# All rights reserved. 9# 10# Redistribution and use in source and binary forms, with or without 11# modification, are permitted provided that the following conditions 12# are met: 13# 14# 1. Redistributions of source code must retain the above copyright 15# notice, this list of conditions and the following disclaimer. 16# 2. Redistributions in binary form must reproduce the above copyright 17# notice, this list of conditions and the following disclaimer in the 18# documentation and/or other materials provided with the distribution. 19# 3. Neither the name of author nor the names of any contributors may be 20# used to endorse or promote products derived from this software 21# without specific prior written permission. 22# 23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33# SUCH DAMAGE. 34# 35 36__all__ = [] 37 38import os 39import sys 40import socket 41import threading 42 43import _multiprocessing 44from multiprocessing import current_process 45from multiprocessing.forking import Popen, duplicate, close, ForkingPickler 46from multiprocessing.util import register_after_fork, debug, sub_debug 47from multiprocessing.connection import Client, Listener 48 49 50# 51# 52# 53 54if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): 55 raise ImportError('pickling of connections not supported') 56 57# 58# Platform specific definitions 59# 60 61if sys.platform == 'win32': 62 import _subprocess 63 from _multiprocessing import win32 64 65 def send_handle(conn, handle, destination_pid): 66 process_handle = win32.OpenProcess( 67 win32.PROCESS_ALL_ACCESS, False, destination_pid 68 ) 69 try: 70 new_handle = duplicate(handle, process_handle) 71 conn.send(new_handle) 72 finally: 73 close(process_handle) 74 75 def recv_handle(conn): 76 return conn.recv() 77 78else: 79 def send_handle(conn, handle, destination_pid): 80 _multiprocessing.sendfd(conn.fileno(), handle) 81 82 def recv_handle(conn): 83 return _multiprocessing.recvfd(conn.fileno()) 84 85# 86# Support for a per-process server thread which caches pickled handles 87# 88 89_cache = set() 90 91def _reset(obj): 92 global _lock, _listener, _cache 93 for h in _cache: 94 close(h) 95 _cache.clear() 96 _lock = threading.Lock() 97 _listener = None 98 99_reset(None) 100register_after_fork(_reset, _reset) 101 102def _get_listener(): 103 global _listener 104 105 if _listener is None: 106 _lock.acquire() 107 try: 108 if _listener is None: 109 debug('starting listener and thread for sending handles') 110 _listener = Listener(authkey=current_process().authkey) 111 t = threading.Thread(target=_serve) 112 t.daemon = True 113 t.start() 114 finally: 115 _lock.release() 116 117 return _listener 118 119def _serve(): 120 from .util import is_exiting, sub_warning 121 122 while 1: 123 try: 124 conn = _listener.accept() 125 handle_wanted, destination_pid = conn.recv() 126 _cache.remove(handle_wanted) 127 send_handle(conn, handle_wanted, destination_pid) 128 close(handle_wanted) 129 conn.close() 130 except: 131 if not is_exiting(): 132 import traceback 133 sub_warning( 134 'thread for sharing handles raised exception :\n' + 135 '-'*79 + '\n' + traceback.format_exc() + '-'*79 136 ) 137 138# 139# Functions to be used for pickling/unpickling objects with handles 140# 141 142def reduce_handle(handle): 143 if Popen.thread_is_spawning(): 144 return (None, Popen.duplicate_for_child(handle), True) 145 dup_handle = duplicate(handle) 146 _cache.add(dup_handle) 147 sub_debug('reducing handle %d', handle) 148 return (_get_listener().address, dup_handle, False) 149 150def rebuild_handle(pickled_data): 151 address, handle, inherited = pickled_data 152 if inherited: 153 return handle 154 sub_debug('rebuilding handle %d', handle) 155 conn = Client(address, authkey=current_process().authkey) 156 conn.send((handle, os.getpid())) 157 new_handle = recv_handle(conn) 158 conn.close() 159 return new_handle 160 161# 162# Register `_multiprocessing.Connection` with `ForkingPickler` 163# 164 165def reduce_connection(conn): 166 rh = reduce_handle(conn.fileno()) 167 return rebuild_connection, (rh, conn.readable, conn.writable) 168 169def rebuild_connection(reduced_handle, readable, writable): 170 handle = rebuild_handle(reduced_handle) 171 return _multiprocessing.Connection( 172 handle, readable=readable, writable=writable 173 ) 174 175ForkingPickler.register(_multiprocessing.Connection, reduce_connection) 176 177# 178# Register `socket.socket` with `ForkingPickler` 179# 180 181def fromfd(fd, family, type_, proto=0): 182 s = socket.fromfd(fd, family, type_, proto) 183 if s.__class__ is not socket.socket: 184 s = socket.socket(_sock=s) 185 return s 186 187def reduce_socket(s): 188 reduced_handle = reduce_handle(s.fileno()) 189 return rebuild_socket, (reduced_handle, s.family, s.type, s.proto) 190 191def rebuild_socket(reduced_handle, family, type_, proto): 192 fd = rebuild_handle(reduced_handle) 193 _sock = fromfd(fd, family, type_, proto) 194 close(fd) 195 return _sock 196 197ForkingPickler.register(socket.socket, reduce_socket) 198 199# 200# Register `_multiprocessing.PipeConnection` with `ForkingPickler` 201# 202 203if sys.platform == 'win32': 204 205 def reduce_pipe_connection(conn): 206 rh = reduce_handle(conn.fileno()) 207 return rebuild_pipe_connection, (rh, conn.readable, conn.writable) 208 209 def rebuild_pipe_connection(reduced_handle, readable, writable): 210 handle = rebuild_handle(reduced_handle) 211 return _multiprocessing.PipeConnection( 212 handle, readable=readable, writable=writable 213 ) 214 215 ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection) 216