1# 2# Module providing the `SyncManager` class for dealing 3# with shared objects 4# 5# multiprocessing/managers.py 6# 7# Copyright (c) 2006-2008, R Oudkerk 8# All rights reserved. 9# 10# Redistribution and use in source and binary forms, with or without 11# modification, are permitted provided that the following conditions 12# are met: 13# 14# 1. Redistributions of source code must retain the above copyright 15# notice, this list of conditions and the following disclaimer. 16# 2. Redistributions in binary form must reproduce the above copyright 17# notice, this list of conditions and the following disclaimer in the 18# documentation and/or other materials provided with the distribution. 19# 3. Neither the name of author nor the names of any contributors may be 20# used to endorse or promote products derived from this software 21# without specific prior written permission. 22# 23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33# SUCH DAMAGE. 34# 35 36__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] 37 38# 39# Imports 40# 41 42import os 43import sys 44import weakref 45import threading 46import array 47import Queue 48 49from traceback import format_exc 50from multiprocessing import Process, current_process, active_children, Pool, util, connection 51from multiprocessing.process import AuthenticationString 52from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler 53from multiprocessing.util import Finalize, info 54 55try: 56 from cPickle import PicklingError 57except ImportError: 58 from pickle import PicklingError 59 60# 61# Register some things for pickling 62# 63 64def reduce_array(a): 65 return array.array, (a.typecode, a.tostring()) 66ForkingPickler.register(array.array, reduce_array) 67 68view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] 69 70# 71# Type for identifying shared objects 72# 73 74class Token(object): 75 ''' 76 Type to uniquely indentify a shared object 77 ''' 78 __slots__ = ('typeid', 'address', 'id') 79 80 def __init__(self, typeid, address, id): 81 (self.typeid, self.address, self.id) = (typeid, address, id) 82 83 def __getstate__(self): 84 return (self.typeid, self.address, self.id) 85 86 def __setstate__(self, state): 87 (self.typeid, self.address, self.id) = state 88 89 def __repr__(self): 90 return 'Token(typeid=%r, address=%r, id=%r)' % \ 91 (self.typeid, self.address, self.id) 92 93# 94# Function for communication with a manager's server process 95# 96 97def dispatch(c, id, methodname, args=(), kwds={}): 98 ''' 99 Send a message to manager using connection `c` and return response 100 ''' 101 c.send((id, methodname, args, kwds)) 102 kind, result = c.recv() 103 if kind == '#RETURN': 104 return result 105 raise convert_to_error(kind, result) 106 107def convert_to_error(kind, result): 108 if kind == '#ERROR': 109 return result 110 elif kind == '#TRACEBACK': 111 assert type(result) is str 112 return RemoteError(result) 113 elif kind == '#UNSERIALIZABLE': 114 assert type(result) is str 115 return RemoteError('Unserializable message: %s\n' % result) 116 else: 117 return ValueError('Unrecognized message type') 118 119class RemoteError(Exception): 120 def __str__(self): 121 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) 122 123# 124# Functions for finding the method names of an object 125# 126 127def all_methods(obj): 128 ''' 129 Return a list of names of methods of `obj` 130 ''' 131 temp = [] 132 for name in dir(obj): 133 func = getattr(obj, name) 134 if hasattr(func, '__call__'): 135 temp.append(name) 136 return temp 137 138def public_methods(obj): 139 ''' 140 Return a list of names of methods of `obj` which do not start with '_' 141 ''' 142 return [name for name in all_methods(obj) if name[0] != '_'] 143 144# 145# Server which is run in a process controlled by a manager 146# 147 148class Server(object): 149 ''' 150 Server class which runs in a process controlled by a manager object 151 ''' 152 public = ['shutdown', 'create', 'accept_connection', 'get_methods', 153 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] 154 155 def __init__(self, registry, address, authkey, serializer): 156 assert isinstance(authkey, bytes) 157 self.registry = registry 158 self.authkey = AuthenticationString(authkey) 159 Listener, Client = listener_client[serializer] 160 161 # do authentication later 162 self.listener = Listener(address=address, backlog=16) 163 self.address = self.listener.address 164 165 self.id_to_obj = {'0': (None, ())} 166 self.id_to_refcount = {} 167 self.mutex = threading.RLock() 168 self.stop = 0 169 170 def serve_forever(self): 171 ''' 172 Run the server forever 173 ''' 174 current_process()._manager_server = self 175 try: 176 try: 177 while 1: 178 try: 179 c = self.listener.accept() 180 except (OSError, IOError): 181 continue 182 t = threading.Thread(target=self.handle_request, args=(c,)) 183 t.daemon = True 184 t.start() 185 except (KeyboardInterrupt, SystemExit): 186 pass 187 finally: 188 self.stop = 999 189 self.listener.close() 190 191 def handle_request(self, c): 192 ''' 193 Handle a new connection 194 ''' 195 funcname = result = request = None 196 try: 197 connection.deliver_challenge(c, self.authkey) 198 connection.answer_challenge(c, self.authkey) 199 request = c.recv() 200 ignore, funcname, args, kwds = request 201 assert funcname in self.public, '%r unrecognized' % funcname 202 func = getattr(self, funcname) 203 except Exception: 204 msg = ('#TRACEBACK', format_exc()) 205 else: 206 try: 207 result = func(c, *args, **kwds) 208 except Exception: 209 msg = ('#TRACEBACK', format_exc()) 210 else: 211 msg = ('#RETURN', result) 212 try: 213 c.send(msg) 214 except Exception, e: 215 try: 216 c.send(('#TRACEBACK', format_exc())) 217 except Exception: 218 pass 219 util.info('Failure to send message: %r', msg) 220 util.info(' ... request was %r', request) 221 util.info(' ... exception was %r', e) 222 223 c.close() 224 225 def serve_client(self, conn): 226 ''' 227 Handle requests from the proxies in a particular process/thread 228 ''' 229 util.debug('starting server thread to service %r', 230 threading.current_thread().name) 231 232 recv = conn.recv 233 send = conn.send 234 id_to_obj = self.id_to_obj 235 236 while not self.stop: 237 238 try: 239 methodname = obj = None 240 request = recv() 241 ident, methodname, args, kwds = request 242 obj, exposed, gettypeid = id_to_obj[ident] 243 244 if methodname not in exposed: 245 raise AttributeError( 246 'method %r of %r object is not in exposed=%r' % 247 (methodname, type(obj), exposed) 248 ) 249 250 function = getattr(obj, methodname) 251 252 try: 253 res = function(*args, **kwds) 254 except Exception, e: 255 msg = ('#ERROR', e) 256 else: 257 typeid = gettypeid and gettypeid.get(methodname, None) 258 if typeid: 259 rident, rexposed = self.create(conn, typeid, res) 260 token = Token(typeid, self.address, rident) 261 msg = ('#PROXY', (rexposed, token)) 262 else: 263 msg = ('#RETURN', res) 264 265 except AttributeError: 266 if methodname is None: 267 msg = ('#TRACEBACK', format_exc()) 268 else: 269 try: 270 fallback_func = self.fallback_mapping[methodname] 271 result = fallback_func( 272 self, conn, ident, obj, *args, **kwds 273 ) 274 msg = ('#RETURN', result) 275 except Exception: 276 msg = ('#TRACEBACK', format_exc()) 277 278 except EOFError: 279 util.debug('got EOF -- exiting thread serving %r', 280 threading.current_thread().name) 281 sys.exit(0) 282 283 except Exception: 284 msg = ('#TRACEBACK', format_exc()) 285 286 try: 287 try: 288 send(msg) 289 except Exception, e: 290 send(('#UNSERIALIZABLE', format_exc())) 291 except Exception, e: 292 util.info('exception in thread serving %r', 293 threading.current_thread().name) 294 util.info(' ... message was %r', msg) 295 util.info(' ... exception was %r', e) 296 conn.close() 297 sys.exit(1) 298 299 def fallback_getvalue(self, conn, ident, obj): 300 return obj 301 302 def fallback_str(self, conn, ident, obj): 303 return str(obj) 304 305 def fallback_repr(self, conn, ident, obj): 306 return repr(obj) 307 308 fallback_mapping = { 309 '__str__':fallback_str, 310 '__repr__':fallback_repr, 311 '#GETVALUE':fallback_getvalue 312 } 313 314 def dummy(self, c): 315 pass 316 317 def debug_info(self, c): 318 ''' 319 Return some info --- useful to spot problems with refcounting 320 ''' 321 self.mutex.acquire() 322 try: 323 result = [] 324 keys = self.id_to_obj.keys() 325 keys.sort() 326 for ident in keys: 327 if ident != '0': 328 result.append(' %s: refcount=%s\n %s' % 329 (ident, self.id_to_refcount[ident], 330 str(self.id_to_obj[ident][0])[:75])) 331 return '\n'.join(result) 332 finally: 333 self.mutex.release() 334 335 def number_of_objects(self, c): 336 ''' 337 Number of shared objects 338 ''' 339 return len(self.id_to_obj) - 1 # don't count ident='0' 340 341 def shutdown(self, c): 342 ''' 343 Shutdown this process 344 ''' 345 try: 346 try: 347 util.debug('manager received shutdown message') 348 c.send(('#RETURN', None)) 349 350 if sys.stdout != sys.__stdout__: 351 util.debug('resetting stdout, stderr') 352 sys.stdout = sys.__stdout__ 353 sys.stderr = sys.__stderr__ 354 355 util._run_finalizers(0) 356 357 for p in active_children(): 358 util.debug('terminating a child process of manager') 359 p.terminate() 360 361 for p in active_children(): 362 util.debug('terminating a child process of manager') 363 p.join() 364 365 util._run_finalizers() 366 util.info('manager exiting with exitcode 0') 367 except: 368 import traceback 369 traceback.print_exc() 370 finally: 371 exit(0) 372 373 def create(self, c, typeid, *args, **kwds): 374 ''' 375 Create a new shared object and return its id 376 ''' 377 self.mutex.acquire() 378 try: 379 callable, exposed, method_to_typeid, proxytype = \ 380 self.registry[typeid] 381 382 if callable is None: 383 assert len(args) == 1 and not kwds 384 obj = args[0] 385 else: 386 obj = callable(*args, **kwds) 387 388 if exposed is None: 389 exposed = public_methods(obj) 390 if method_to_typeid is not None: 391 assert type(method_to_typeid) is dict 392 exposed = list(exposed) + list(method_to_typeid) 393 394 ident = '%x' % id(obj) # convert to string because xmlrpclib 395 # only has 32 bit signed integers 396 util.debug('%r callable returned object with id %r', typeid, ident) 397 398 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) 399 if ident not in self.id_to_refcount: 400 self.id_to_refcount[ident] = 0 401 # increment the reference count immediately, to avoid 402 # this object being garbage collected before a Proxy 403 # object for it can be created. The caller of create() 404 # is responsible for doing a decref once the Proxy object 405 # has been created. 406 self.incref(c, ident) 407 return ident, tuple(exposed) 408 finally: 409 self.mutex.release() 410 411 def get_methods(self, c, token): 412 ''' 413 Return the methods of the shared object indicated by token 414 ''' 415 return tuple(self.id_to_obj[token.id][1]) 416 417 def accept_connection(self, c, name): 418 ''' 419 Spawn a new thread to serve this connection 420 ''' 421 threading.current_thread().name = name 422 c.send(('#RETURN', None)) 423 self.serve_client(c) 424 425 def incref(self, c, ident): 426 self.mutex.acquire() 427 try: 428 self.id_to_refcount[ident] += 1 429 finally: 430 self.mutex.release() 431 432 def decref(self, c, ident): 433 self.mutex.acquire() 434 try: 435 assert self.id_to_refcount[ident] >= 1 436 self.id_to_refcount[ident] -= 1 437 if self.id_to_refcount[ident] == 0: 438 del self.id_to_obj[ident], self.id_to_refcount[ident] 439 util.debug('disposing of obj with id %r', ident) 440 finally: 441 self.mutex.release() 442 443# 444# Class to represent state of a manager 445# 446 447class State(object): 448 __slots__ = ['value'] 449 INITIAL = 0 450 STARTED = 1 451 SHUTDOWN = 2 452 453# 454# Mapping from serializer name to Listener and Client types 455# 456 457listener_client = { 458 'pickle' : (connection.Listener, connection.Client), 459 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) 460 } 461 462# 463# Definition of BaseManager 464# 465 466class BaseManager(object): 467 ''' 468 Base class for managers 469 ''' 470 _registry = {} 471 _Server = Server 472 473 def __init__(self, address=None, authkey=None, serializer='pickle'): 474 if authkey is None: 475 authkey = current_process().authkey 476 self._address = address # XXX not final address if eg ('', 0) 477 self._authkey = AuthenticationString(authkey) 478 self._state = State() 479 self._state.value = State.INITIAL 480 self._serializer = serializer 481 self._Listener, self._Client = listener_client[serializer] 482 483 def __reduce__(self): 484 return type(self).from_address, \ 485 (self._address, self._authkey, self._serializer) 486 487 def get_server(self): 488 ''' 489 Return server object with serve_forever() method and address attribute 490 ''' 491 assert self._state.value == State.INITIAL 492 return Server(self._registry, self._address, 493 self._authkey, self._serializer) 494 495 def connect(self): 496 ''' 497 Connect manager object to the server process 498 ''' 499 Listener, Client = listener_client[self._serializer] 500 conn = Client(self._address, authkey=self._authkey) 501 dispatch(conn, None, 'dummy') 502 self._state.value = State.STARTED 503 504 def start(self, initializer=None, initargs=()): 505 ''' 506 Spawn a server process for this manager object 507 ''' 508 assert self._state.value == State.INITIAL 509 510 if initializer is not None and not hasattr(initializer, '__call__'): 511 raise TypeError('initializer must be a callable') 512 513 # pipe over which we will retrieve address of server 514 reader, writer = connection.Pipe(duplex=False) 515 516 # spawn process which runs a server 517 self._process = Process( 518 target=type(self)._run_server, 519 args=(self._registry, self._address, self._authkey, 520 self._serializer, writer, initializer, initargs), 521 ) 522 ident = ':'.join(str(i) for i in self._process._identity) 523 self._process.name = type(self).__name__ + '-' + ident 524 self._process.start() 525 526 # get address of server 527 writer.close() 528 self._address = reader.recv() 529 reader.close() 530 531 # register a finalizer 532 self._state.value = State.STARTED 533 self.shutdown = util.Finalize( 534 self, type(self)._finalize_manager, 535 args=(self._process, self._address, self._authkey, 536 self._state, self._Client), 537 exitpriority=0 538 ) 539 540 @classmethod 541 def _run_server(cls, registry, address, authkey, serializer, writer, 542 initializer=None, initargs=()): 543 ''' 544 Create a server, report its address and run it 545 ''' 546 if initializer is not None: 547 initializer(*initargs) 548 549 # create server 550 server = cls._Server(registry, address, authkey, serializer) 551 552 # inform parent process of the server's address 553 writer.send(server.address) 554 writer.close() 555 556 # run the manager 557 util.info('manager serving at %r', server.address) 558 server.serve_forever() 559 560 def _create(self, typeid, *args, **kwds): 561 ''' 562 Create a new shared object; return the token and exposed tuple 563 ''' 564 assert self._state.value == State.STARTED, 'server not yet started' 565 conn = self._Client(self._address, authkey=self._authkey) 566 try: 567 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) 568 finally: 569 conn.close() 570 return Token(typeid, self._address, id), exposed 571 572 def join(self, timeout=None): 573 ''' 574 Join the manager process (if it has been spawned) 575 ''' 576 self._process.join(timeout) 577 578 def _debug_info(self): 579 ''' 580 Return some info about the servers shared objects and connections 581 ''' 582 conn = self._Client(self._address, authkey=self._authkey) 583 try: 584 return dispatch(conn, None, 'debug_info') 585 finally: 586 conn.close() 587 588 def _number_of_objects(self): 589 ''' 590 Return the number of shared objects 591 ''' 592 conn = self._Client(self._address, authkey=self._authkey) 593 try: 594 return dispatch(conn, None, 'number_of_objects') 595 finally: 596 conn.close() 597 598 def __enter__(self): 599 return self 600 601 def __exit__(self, exc_type, exc_val, exc_tb): 602 self.shutdown() 603 604 @staticmethod 605 def _finalize_manager(process, address, authkey, state, _Client): 606 ''' 607 Shutdown the manager process; will be registered as a finalizer 608 ''' 609 if process.is_alive(): 610 util.info('sending shutdown message to manager') 611 try: 612 conn = _Client(address, authkey=authkey) 613 try: 614 dispatch(conn, None, 'shutdown') 615 finally: 616 conn.close() 617 except Exception: 618 pass 619 620 process.join(timeout=0.2) 621 if process.is_alive(): 622 util.info('manager still alive') 623 if hasattr(process, 'terminate'): 624 util.info('trying to `terminate()` manager process') 625 process.terminate() 626 process.join(timeout=0.1) 627 if process.is_alive(): 628 util.info('manager still alive after terminate') 629 630 state.value = State.SHUTDOWN 631 try: 632 del BaseProxy._address_to_local[address] 633 except KeyError: 634 pass 635 636 address = property(lambda self: self._address) 637 638 @classmethod 639 def register(cls, typeid, callable=None, proxytype=None, exposed=None, 640 method_to_typeid=None, create_method=True): 641 ''' 642 Register a typeid with the manager type 643 ''' 644 if '_registry' not in cls.__dict__: 645 cls._registry = cls._registry.copy() 646 647 if proxytype is None: 648 proxytype = AutoProxy 649 650 exposed = exposed or getattr(proxytype, '_exposed_', None) 651 652 method_to_typeid = method_to_typeid or \ 653 getattr(proxytype, '_method_to_typeid_', None) 654 655 if method_to_typeid: 656 for key, value in method_to_typeid.items(): 657 assert type(key) is str, '%r is not a string' % key 658 assert type(value) is str, '%r is not a string' % value 659 660 cls._registry[typeid] = ( 661 callable, exposed, method_to_typeid, proxytype 662 ) 663 664 if create_method: 665 def temp(self, *args, **kwds): 666 util.debug('requesting creation of a shared %r object', typeid) 667 token, exp = self._create(typeid, *args, **kwds) 668 proxy = proxytype( 669 token, self._serializer, manager=self, 670 authkey=self._authkey, exposed=exp 671 ) 672 conn = self._Client(token.address, authkey=self._authkey) 673 dispatch(conn, None, 'decref', (token.id,)) 674 return proxy 675 temp.__name__ = typeid 676 setattr(cls, typeid, temp) 677 678# 679# Subclass of set which get cleared after a fork 680# 681 682class ProcessLocalSet(set): 683 def __init__(self): 684 util.register_after_fork(self, lambda obj: obj.clear()) 685 def __reduce__(self): 686 return type(self), () 687 688# 689# Definition of BaseProxy 690# 691 692class BaseProxy(object): 693 ''' 694 A base for proxies of shared objects 695 ''' 696 _address_to_local = {} 697 _mutex = util.ForkAwareThreadLock() 698 699 def __init__(self, token, serializer, manager=None, 700 authkey=None, exposed=None, incref=True): 701 BaseProxy._mutex.acquire() 702 try: 703 tls_idset = BaseProxy._address_to_local.get(token.address, None) 704 if tls_idset is None: 705 tls_idset = util.ForkAwareLocal(), ProcessLocalSet() 706 BaseProxy._address_to_local[token.address] = tls_idset 707 finally: 708 BaseProxy._mutex.release() 709 710 # self._tls is used to record the connection used by this 711 # thread to communicate with the manager at token.address 712 self._tls = tls_idset[0] 713 714 # self._idset is used to record the identities of all shared 715 # objects for which the current process owns references and 716 # which are in the manager at token.address 717 self._idset = tls_idset[1] 718 719 self._token = token 720 self._id = self._token.id 721 self._manager = manager 722 self._serializer = serializer 723 self._Client = listener_client[serializer][1] 724 725 if authkey is not None: 726 self._authkey = AuthenticationString(authkey) 727 elif self._manager is not None: 728 self._authkey = self._manager._authkey 729 else: 730 self._authkey = current_process().authkey 731 732 if incref: 733 self._incref() 734 735 util.register_after_fork(self, BaseProxy._after_fork) 736 737 def _connect(self): 738 util.debug('making connection to manager') 739 name = current_process().name 740 if threading.current_thread().name != 'MainThread': 741 name += '|' + threading.current_thread().name 742 conn = self._Client(self._token.address, authkey=self._authkey) 743 dispatch(conn, None, 'accept_connection', (name,)) 744 self._tls.connection = conn 745 746 def _callmethod(self, methodname, args=(), kwds={}): 747 ''' 748 Try to call a method of the referrent and return a copy of the result 749 ''' 750 try: 751 conn = self._tls.connection 752 except AttributeError: 753 util.debug('thread %r does not own a connection', 754 threading.current_thread().name) 755 self._connect() 756 conn = self._tls.connection 757 758 conn.send((self._id, methodname, args, kwds)) 759 kind, result = conn.recv() 760 761 if kind == '#RETURN': 762 return result 763 elif kind == '#PROXY': 764 exposed, token = result 765 proxytype = self._manager._registry[token.typeid][-1] 766 token.address = self._token.address 767 proxy = proxytype( 768 token, self._serializer, manager=self._manager, 769 authkey=self._authkey, exposed=exposed 770 ) 771 conn = self._Client(token.address, authkey=self._authkey) 772 dispatch(conn, None, 'decref', (token.id,)) 773 return proxy 774 raise convert_to_error(kind, result) 775 776 def _getvalue(self): 777 ''' 778 Get a copy of the value of the referent 779 ''' 780 return self._callmethod('#GETVALUE') 781 782 def _incref(self): 783 conn = self._Client(self._token.address, authkey=self._authkey) 784 dispatch(conn, None, 'incref', (self._id,)) 785 util.debug('INCREF %r', self._token.id) 786 787 self._idset.add(self._id) 788 789 state = self._manager and self._manager._state 790 791 self._close = util.Finalize( 792 self, BaseProxy._decref, 793 args=(self._token, self._authkey, state, 794 self._tls, self._idset, self._Client), 795 exitpriority=10 796 ) 797 798 @staticmethod 799 def _decref(token, authkey, state, tls, idset, _Client): 800 idset.discard(token.id) 801 802 # check whether manager is still alive 803 if state is None or state.value == State.STARTED: 804 # tell manager this process no longer cares about referent 805 try: 806 util.debug('DECREF %r', token.id) 807 conn = _Client(token.address, authkey=authkey) 808 dispatch(conn, None, 'decref', (token.id,)) 809 except Exception, e: 810 util.debug('... decref failed %s', e) 811 812 else: 813 util.debug('DECREF %r -- manager already shutdown', token.id) 814 815 # check whether we can close this thread's connection because 816 # the process owns no more references to objects for this manager 817 if not idset and hasattr(tls, 'connection'): 818 util.debug('thread %r has no more proxies so closing conn', 819 threading.current_thread().name) 820 tls.connection.close() 821 del tls.connection 822 823 def _after_fork(self): 824 self._manager = None 825 try: 826 self._incref() 827 except Exception, e: 828 # the proxy may just be for a manager which has shutdown 829 util.info('incref failed: %s' % e) 830 831 def __reduce__(self): 832 kwds = {} 833 if Popen.thread_is_spawning(): 834 kwds['authkey'] = self._authkey 835 836 if getattr(self, '_isauto', False): 837 kwds['exposed'] = self._exposed_ 838 return (RebuildProxy, 839 (AutoProxy, self._token, self._serializer, kwds)) 840 else: 841 return (RebuildProxy, 842 (type(self), self._token, self._serializer, kwds)) 843 844 def __deepcopy__(self, memo): 845 return self._getvalue() 846 847 def __repr__(self): 848 return '<%s object, typeid %r at %s>' % \ 849 (type(self).__name__, self._token.typeid, '0x%x' % id(self)) 850 851 def __str__(self): 852 ''' 853 Return representation of the referent (or a fall-back if that fails) 854 ''' 855 try: 856 return self._callmethod('__repr__') 857 except Exception: 858 return repr(self)[:-1] + "; '__str__()' failed>" 859 860# 861# Function used for unpickling 862# 863 864def RebuildProxy(func, token, serializer, kwds): 865 ''' 866 Function used for unpickling proxy objects. 867 868 If possible the shared object is returned, or otherwise a proxy for it. 869 ''' 870 server = getattr(current_process(), '_manager_server', None) 871 872 if server and server.address == token.address: 873 return server.id_to_obj[token.id][0] 874 else: 875 incref = ( 876 kwds.pop('incref', True) and 877 not getattr(current_process(), '_inheriting', False) 878 ) 879 return func(token, serializer, incref=incref, **kwds) 880 881# 882# Functions to create proxies and proxy types 883# 884 885def MakeProxyType(name, exposed, _cache={}): 886 ''' 887 Return a proxy type whose methods are given by `exposed` 888 ''' 889 exposed = tuple(exposed) 890 try: 891 return _cache[(name, exposed)] 892 except KeyError: 893 pass 894 895 dic = {} 896 897 for meth in exposed: 898 exec '''def %s(self, *args, **kwds): 899 return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic 900 901 ProxyType = type(name, (BaseProxy,), dic) 902 ProxyType._exposed_ = exposed 903 _cache[(name, exposed)] = ProxyType 904 return ProxyType 905 906 907def AutoProxy(token, serializer, manager=None, authkey=None, 908 exposed=None, incref=True): 909 ''' 910 Return an auto-proxy for `token` 911 ''' 912 _Client = listener_client[serializer][1] 913 914 if exposed is None: 915 conn = _Client(token.address, authkey=authkey) 916 try: 917 exposed = dispatch(conn, None, 'get_methods', (token,)) 918 finally: 919 conn.close() 920 921 if authkey is None and manager is not None: 922 authkey = manager._authkey 923 if authkey is None: 924 authkey = current_process().authkey 925 926 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) 927 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, 928 incref=incref) 929 proxy._isauto = True 930 return proxy 931 932# 933# Types/callables which we will register with SyncManager 934# 935 936class Namespace(object): 937 def __init__(self, **kwds): 938 self.__dict__.update(kwds) 939 def __repr__(self): 940 items = self.__dict__.items() 941 temp = [] 942 for name, value in items: 943 if not name.startswith('_'): 944 temp.append('%s=%r' % (name, value)) 945 temp.sort() 946 return 'Namespace(%s)' % str.join(', ', temp) 947 948class Value(object): 949 def __init__(self, typecode, value, lock=True): 950 self._typecode = typecode 951 self._value = value 952 def get(self): 953 return self._value 954 def set(self, value): 955 self._value = value 956 def __repr__(self): 957 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) 958 value = property(get, set) 959 960def Array(typecode, sequence, lock=True): 961 return array.array(typecode, sequence) 962 963# 964# Proxy types used by SyncManager 965# 966 967class IteratorProxy(BaseProxy): 968 # XXX remove methods for Py3.0 and Py2.6 969 _exposed_ = ('__next__', 'next', 'send', 'throw', 'close') 970 def __iter__(self): 971 return self 972 def __next__(self, *args): 973 return self._callmethod('__next__', args) 974 def next(self, *args): 975 return self._callmethod('next', args) 976 def send(self, *args): 977 return self._callmethod('send', args) 978 def throw(self, *args): 979 return self._callmethod('throw', args) 980 def close(self, *args): 981 return self._callmethod('close', args) 982 983 984class AcquirerProxy(BaseProxy): 985 _exposed_ = ('acquire', 'release') 986 def acquire(self, blocking=True): 987 return self._callmethod('acquire', (blocking,)) 988 def release(self): 989 return self._callmethod('release') 990 def __enter__(self): 991 return self._callmethod('acquire') 992 def __exit__(self, exc_type, exc_val, exc_tb): 993 return self._callmethod('release') 994 995 996class ConditionProxy(AcquirerProxy): 997 # XXX will Condition.notfyAll() name be available in Py3.0? 998 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') 999 def wait(self, timeout=None): 1000 return self._callmethod('wait', (timeout,)) 1001 def notify(self): 1002 return self._callmethod('notify') 1003 def notify_all(self): 1004 return self._callmethod('notify_all') 1005 1006class EventProxy(BaseProxy): 1007 _exposed_ = ('is_set', 'set', 'clear', 'wait') 1008 def is_set(self): 1009 return self._callmethod('is_set') 1010 def set(self): 1011 return self._callmethod('set') 1012 def clear(self): 1013 return self._callmethod('clear') 1014 def wait(self, timeout=None): 1015 return self._callmethod('wait', (timeout,)) 1016 1017class NamespaceProxy(BaseProxy): 1018 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') 1019 def __getattr__(self, key): 1020 if key[0] == '_': 1021 return object.__getattribute__(self, key) 1022 callmethod = object.__getattribute__(self, '_callmethod') 1023 return callmethod('__getattribute__', (key,)) 1024 def __setattr__(self, key, value): 1025 if key[0] == '_': 1026 return object.__setattr__(self, key, value) 1027 callmethod = object.__getattribute__(self, '_callmethod') 1028 return callmethod('__setattr__', (key, value)) 1029 def __delattr__(self, key): 1030 if key[0] == '_': 1031 return object.__delattr__(self, key) 1032 callmethod = object.__getattribute__(self, '_callmethod') 1033 return callmethod('__delattr__', (key,)) 1034 1035 1036class ValueProxy(BaseProxy): 1037 _exposed_ = ('get', 'set') 1038 def get(self): 1039 return self._callmethod('get') 1040 def set(self, value): 1041 return self._callmethod('set', (value,)) 1042 value = property(get, set) 1043 1044 1045BaseListProxy = MakeProxyType('BaseListProxy', ( 1046 '__add__', '__contains__', '__delitem__', '__delslice__', 1047 '__getitem__', '__getslice__', '__len__', '__mul__', 1048 '__reversed__', '__rmul__', '__setitem__', '__setslice__', 1049 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 1050 'reverse', 'sort', '__imul__' 1051 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 1052class ListProxy(BaseListProxy): 1053 def __iadd__(self, value): 1054 self._callmethod('extend', (value,)) 1055 return self 1056 def __imul__(self, value): 1057 self._callmethod('__imul__', (value,)) 1058 return self 1059 1060 1061DictProxy = MakeProxyType('DictProxy', ( 1062 '__contains__', '__delitem__', '__getitem__', '__len__', 1063 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items', 1064 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' 1065 )) 1066 1067 1068ArrayProxy = MakeProxyType('ArrayProxy', ( 1069 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__' 1070 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 1071 1072 1073PoolProxy = MakeProxyType('PoolProxy', ( 1074 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 1075 'map', 'map_async', 'terminate' 1076 )) 1077PoolProxy._method_to_typeid_ = { 1078 'apply_async': 'AsyncResult', 1079 'map_async': 'AsyncResult', 1080 'imap': 'Iterator', 1081 'imap_unordered': 'Iterator' 1082 } 1083 1084# 1085# Definition of SyncManager 1086# 1087 1088class SyncManager(BaseManager): 1089 ''' 1090 Subclass of `BaseManager` which supports a number of shared object types. 1091 1092 The types registered are those intended for the synchronization 1093 of threads, plus `dict`, `list` and `Namespace`. 1094 1095 The `multiprocessing.Manager()` function creates started instances of 1096 this class. 1097 ''' 1098 1099SyncManager.register('Queue', Queue.Queue) 1100SyncManager.register('JoinableQueue', Queue.Queue) 1101SyncManager.register('Event', threading.Event, EventProxy) 1102SyncManager.register('Lock', threading.Lock, AcquirerProxy) 1103SyncManager.register('RLock', threading.RLock, AcquirerProxy) 1104SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) 1105SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, 1106 AcquirerProxy) 1107SyncManager.register('Condition', threading.Condition, ConditionProxy) 1108SyncManager.register('Pool', Pool, PoolProxy) 1109SyncManager.register('list', list, ListProxy) 1110SyncManager.register('dict', dict, DictProxy) 1111SyncManager.register('Value', Value, ValueProxy) 1112SyncManager.register('Array', Array, ArrayProxy) 1113SyncManager.register('Namespace', Namespace, NamespaceProxy) 1114 1115# types returned by methods of PoolProxy 1116SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) 1117SyncManager.register('AsyncResult', create_method=False) 1118