• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk
8# All rights reserved.
9#
10# Redistribution and use in source and binary forms, with or without
11# modification, are permitted provided that the following conditions
12# are met:
13#
14# 1. Redistributions of source code must retain the above copyright
15#    notice, this list of conditions and the following disclaimer.
16# 2. Redistributions in binary form must reproduce the above copyright
17#    notice, this list of conditions and the following disclaimer in the
18#    documentation and/or other materials provided with the distribution.
19# 3. Neither the name of author nor the names of any contributors may be
20#    used to endorse or promote products derived from this software
21#    without specific prior written permission.
22#
23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33# SUCH DAMAGE.
34#
35
36__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
37
38#
39# Imports
40#
41
42import os
43import sys
44import weakref
45import threading
46import array
47import Queue
48
49from traceback import format_exc
50from multiprocessing import Process, current_process, active_children, Pool, util, connection
51from multiprocessing.process import AuthenticationString
52from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
53from multiprocessing.util import Finalize, info
54
55try:
56    from cPickle import PicklingError
57except ImportError:
58    from pickle import PicklingError
59
60#
61# Register some things for pickling
62#
63
64def reduce_array(a):
65    return array.array, (a.typecode, a.tostring())
66ForkingPickler.register(array.array, reduce_array)
67
68view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
69
70#
71# Type for identifying shared objects
72#
73
74class Token(object):
75    '''
76    Type to uniquely indentify a shared object
77    '''
78    __slots__ = ('typeid', 'address', 'id')
79
80    def __init__(self, typeid, address, id):
81        (self.typeid, self.address, self.id) = (typeid, address, id)
82
83    def __getstate__(self):
84        return (self.typeid, self.address, self.id)
85
86    def __setstate__(self, state):
87        (self.typeid, self.address, self.id) = state
88
89    def __repr__(self):
90        return 'Token(typeid=%r, address=%r, id=%r)' % \
91               (self.typeid, self.address, self.id)
92
93#
94# Function for communication with a manager's server process
95#
96
97def dispatch(c, id, methodname, args=(), kwds={}):
98    '''
99    Send a message to manager using connection `c` and return response
100    '''
101    c.send((id, methodname, args, kwds))
102    kind, result = c.recv()
103    if kind == '#RETURN':
104        return result
105    raise convert_to_error(kind, result)
106
107def convert_to_error(kind, result):
108    if kind == '#ERROR':
109        return result
110    elif kind == '#TRACEBACK':
111        assert type(result) is str
112        return  RemoteError(result)
113    elif kind == '#UNSERIALIZABLE':
114        assert type(result) is str
115        return RemoteError('Unserializable message: %s\n' % result)
116    else:
117        return ValueError('Unrecognized message type')
118
119class RemoteError(Exception):
120    def __str__(self):
121        return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
122
123#
124# Functions for finding the method names of an object
125#
126
127def all_methods(obj):
128    '''
129    Return a list of names of methods of `obj`
130    '''
131    temp = []
132    for name in dir(obj):
133        func = getattr(obj, name)
134        if hasattr(func, '__call__'):
135            temp.append(name)
136    return temp
137
138def public_methods(obj):
139    '''
140    Return a list of names of methods of `obj` which do not start with '_'
141    '''
142    return [name for name in all_methods(obj) if name[0] != '_']
143
144#
145# Server which is run in a process controlled by a manager
146#
147
148class Server(object):
149    '''
150    Server class which runs in a process controlled by a manager object
151    '''
152    public = ['shutdown', 'create', 'accept_connection', 'get_methods',
153              'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
154
155    def __init__(self, registry, address, authkey, serializer):
156        assert isinstance(authkey, bytes)
157        self.registry = registry
158        self.authkey = AuthenticationString(authkey)
159        Listener, Client = listener_client[serializer]
160
161        # do authentication later
162        self.listener = Listener(address=address, backlog=16)
163        self.address = self.listener.address
164
165        self.id_to_obj = {'0': (None, ())}
166        self.id_to_refcount = {}
167        self.mutex = threading.RLock()
168        self.stop = 0
169
170    def serve_forever(self):
171        '''
172        Run the server forever
173        '''
174        current_process()._manager_server = self
175        try:
176            try:
177                while 1:
178                    try:
179                        c = self.listener.accept()
180                    except (OSError, IOError):
181                        continue
182                    t = threading.Thread(target=self.handle_request, args=(c,))
183                    t.daemon = True
184                    t.start()
185            except (KeyboardInterrupt, SystemExit):
186                pass
187        finally:
188            self.stop = 999
189            self.listener.close()
190
191    def handle_request(self, c):
192        '''
193        Handle a new connection
194        '''
195        funcname = result = request = None
196        try:
197            connection.deliver_challenge(c, self.authkey)
198            connection.answer_challenge(c, self.authkey)
199            request = c.recv()
200            ignore, funcname, args, kwds = request
201            assert funcname in self.public, '%r unrecognized' % funcname
202            func = getattr(self, funcname)
203        except Exception:
204            msg = ('#TRACEBACK', format_exc())
205        else:
206            try:
207                result = func(c, *args, **kwds)
208            except Exception:
209                msg = ('#TRACEBACK', format_exc())
210            else:
211                msg = ('#RETURN', result)
212        try:
213            c.send(msg)
214        except Exception, e:
215            try:
216                c.send(('#TRACEBACK', format_exc()))
217            except Exception:
218                pass
219            util.info('Failure to send message: %r', msg)
220            util.info(' ... request was %r', request)
221            util.info(' ... exception was %r', e)
222
223        c.close()
224
225    def serve_client(self, conn):
226        '''
227        Handle requests from the proxies in a particular process/thread
228        '''
229        util.debug('starting server thread to service %r',
230                   threading.current_thread().name)
231
232        recv = conn.recv
233        send = conn.send
234        id_to_obj = self.id_to_obj
235
236        while not self.stop:
237
238            try:
239                methodname = obj = None
240                request = recv()
241                ident, methodname, args, kwds = request
242                obj, exposed, gettypeid = id_to_obj[ident]
243
244                if methodname not in exposed:
245                    raise AttributeError(
246                        'method %r of %r object is not in exposed=%r' %
247                        (methodname, type(obj), exposed)
248                        )
249
250                function = getattr(obj, methodname)
251
252                try:
253                    res = function(*args, **kwds)
254                except Exception, e:
255                    msg = ('#ERROR', e)
256                else:
257                    typeid = gettypeid and gettypeid.get(methodname, None)
258                    if typeid:
259                        rident, rexposed = self.create(conn, typeid, res)
260                        token = Token(typeid, self.address, rident)
261                        msg = ('#PROXY', (rexposed, token))
262                    else:
263                        msg = ('#RETURN', res)
264
265            except AttributeError:
266                if methodname is None:
267                    msg = ('#TRACEBACK', format_exc())
268                else:
269                    try:
270                        fallback_func = self.fallback_mapping[methodname]
271                        result = fallback_func(
272                            self, conn, ident, obj, *args, **kwds
273                            )
274                        msg = ('#RETURN', result)
275                    except Exception:
276                        msg = ('#TRACEBACK', format_exc())
277
278            except EOFError:
279                util.debug('got EOF -- exiting thread serving %r',
280                           threading.current_thread().name)
281                sys.exit(0)
282
283            except Exception:
284                msg = ('#TRACEBACK', format_exc())
285
286            try:
287                try:
288                    send(msg)
289                except Exception, e:
290                    send(('#UNSERIALIZABLE', format_exc()))
291            except Exception, e:
292                util.info('exception in thread serving %r',
293                        threading.current_thread().name)
294                util.info(' ... message was %r', msg)
295                util.info(' ... exception was %r', e)
296                conn.close()
297                sys.exit(1)
298
299    def fallback_getvalue(self, conn, ident, obj):
300        return obj
301
302    def fallback_str(self, conn, ident, obj):
303        return str(obj)
304
305    def fallback_repr(self, conn, ident, obj):
306        return repr(obj)
307
308    fallback_mapping = {
309        '__str__':fallback_str,
310        '__repr__':fallback_repr,
311        '#GETVALUE':fallback_getvalue
312        }
313
314    def dummy(self, c):
315        pass
316
317    def debug_info(self, c):
318        '''
319        Return some info --- useful to spot problems with refcounting
320        '''
321        self.mutex.acquire()
322        try:
323            result = []
324            keys = self.id_to_obj.keys()
325            keys.sort()
326            for ident in keys:
327                if ident != '0':
328                    result.append('  %s:       refcount=%s\n    %s' %
329                                  (ident, self.id_to_refcount[ident],
330                                   str(self.id_to_obj[ident][0])[:75]))
331            return '\n'.join(result)
332        finally:
333            self.mutex.release()
334
335    def number_of_objects(self, c):
336        '''
337        Number of shared objects
338        '''
339        return len(self.id_to_obj) - 1      # don't count ident='0'
340
341    def shutdown(self, c):
342        '''
343        Shutdown this process
344        '''
345        try:
346            try:
347                util.debug('manager received shutdown message')
348                c.send(('#RETURN', None))
349
350                if sys.stdout != sys.__stdout__:
351                    util.debug('resetting stdout, stderr')
352                    sys.stdout = sys.__stdout__
353                    sys.stderr = sys.__stderr__
354
355                util._run_finalizers(0)
356
357                for p in active_children():
358                    util.debug('terminating a child process of manager')
359                    p.terminate()
360
361                for p in active_children():
362                    util.debug('terminating a child process of manager')
363                    p.join()
364
365                util._run_finalizers()
366                util.info('manager exiting with exitcode 0')
367            except:
368                import traceback
369                traceback.print_exc()
370        finally:
371            exit(0)
372
373    def create(self, c, typeid, *args, **kwds):
374        '''
375        Create a new shared object and return its id
376        '''
377        self.mutex.acquire()
378        try:
379            callable, exposed, method_to_typeid, proxytype = \
380                      self.registry[typeid]
381
382            if callable is None:
383                assert len(args) == 1 and not kwds
384                obj = args[0]
385            else:
386                obj = callable(*args, **kwds)
387
388            if exposed is None:
389                exposed = public_methods(obj)
390            if method_to_typeid is not None:
391                assert type(method_to_typeid) is dict
392                exposed = list(exposed) + list(method_to_typeid)
393
394            ident = '%x' % id(obj)  # convert to string because xmlrpclib
395                                    # only has 32 bit signed integers
396            util.debug('%r callable returned object with id %r', typeid, ident)
397
398            self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
399            if ident not in self.id_to_refcount:
400                self.id_to_refcount[ident] = 0
401            # increment the reference count immediately, to avoid
402            # this object being garbage collected before a Proxy
403            # object for it can be created.  The caller of create()
404            # is responsible for doing a decref once the Proxy object
405            # has been created.
406            self.incref(c, ident)
407            return ident, tuple(exposed)
408        finally:
409            self.mutex.release()
410
411    def get_methods(self, c, token):
412        '''
413        Return the methods of the shared object indicated by token
414        '''
415        return tuple(self.id_to_obj[token.id][1])
416
417    def accept_connection(self, c, name):
418        '''
419        Spawn a new thread to serve this connection
420        '''
421        threading.current_thread().name = name
422        c.send(('#RETURN', None))
423        self.serve_client(c)
424
425    def incref(self, c, ident):
426        self.mutex.acquire()
427        try:
428            self.id_to_refcount[ident] += 1
429        finally:
430            self.mutex.release()
431
432    def decref(self, c, ident):
433        self.mutex.acquire()
434        try:
435            assert self.id_to_refcount[ident] >= 1
436            self.id_to_refcount[ident] -= 1
437            if self.id_to_refcount[ident] == 0:
438                del self.id_to_obj[ident], self.id_to_refcount[ident]
439                util.debug('disposing of obj with id %r', ident)
440        finally:
441            self.mutex.release()
442
443#
444# Class to represent state of a manager
445#
446
447class State(object):
448    __slots__ = ['value']
449    INITIAL = 0
450    STARTED = 1
451    SHUTDOWN = 2
452
453#
454# Mapping from serializer name to Listener and Client types
455#
456
457listener_client = {
458    'pickle' : (connection.Listener, connection.Client),
459    'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
460    }
461
462#
463# Definition of BaseManager
464#
465
466class BaseManager(object):
467    '''
468    Base class for managers
469    '''
470    _registry = {}
471    _Server = Server
472
473    def __init__(self, address=None, authkey=None, serializer='pickle'):
474        if authkey is None:
475            authkey = current_process().authkey
476        self._address = address     # XXX not final address if eg ('', 0)
477        self._authkey = AuthenticationString(authkey)
478        self._state = State()
479        self._state.value = State.INITIAL
480        self._serializer = serializer
481        self._Listener, self._Client = listener_client[serializer]
482
483    def __reduce__(self):
484        return type(self).from_address, \
485               (self._address, self._authkey, self._serializer)
486
487    def get_server(self):
488        '''
489        Return server object with serve_forever() method and address attribute
490        '''
491        assert self._state.value == State.INITIAL
492        return Server(self._registry, self._address,
493                      self._authkey, self._serializer)
494
495    def connect(self):
496        '''
497        Connect manager object to the server process
498        '''
499        Listener, Client = listener_client[self._serializer]
500        conn = Client(self._address, authkey=self._authkey)
501        dispatch(conn, None, 'dummy')
502        self._state.value = State.STARTED
503
504    def start(self, initializer=None, initargs=()):
505        '''
506        Spawn a server process for this manager object
507        '''
508        assert self._state.value == State.INITIAL
509
510        if initializer is not None and not hasattr(initializer, '__call__'):
511            raise TypeError('initializer must be a callable')
512
513        # pipe over which we will retrieve address of server
514        reader, writer = connection.Pipe(duplex=False)
515
516        # spawn process which runs a server
517        self._process = Process(
518            target=type(self)._run_server,
519            args=(self._registry, self._address, self._authkey,
520                  self._serializer, writer, initializer, initargs),
521            )
522        ident = ':'.join(str(i) for i in self._process._identity)
523        self._process.name = type(self).__name__  + '-' + ident
524        self._process.start()
525
526        # get address of server
527        writer.close()
528        self._address = reader.recv()
529        reader.close()
530
531        # register a finalizer
532        self._state.value = State.STARTED
533        self.shutdown = util.Finalize(
534            self, type(self)._finalize_manager,
535            args=(self._process, self._address, self._authkey,
536                  self._state, self._Client),
537            exitpriority=0
538            )
539
540    @classmethod
541    def _run_server(cls, registry, address, authkey, serializer, writer,
542                    initializer=None, initargs=()):
543        '''
544        Create a server, report its address and run it
545        '''
546        if initializer is not None:
547            initializer(*initargs)
548
549        # create server
550        server = cls._Server(registry, address, authkey, serializer)
551
552        # inform parent process of the server's address
553        writer.send(server.address)
554        writer.close()
555
556        # run the manager
557        util.info('manager serving at %r', server.address)
558        server.serve_forever()
559
560    def _create(self, typeid, *args, **kwds):
561        '''
562        Create a new shared object; return the token and exposed tuple
563        '''
564        assert self._state.value == State.STARTED, 'server not yet started'
565        conn = self._Client(self._address, authkey=self._authkey)
566        try:
567            id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
568        finally:
569            conn.close()
570        return Token(typeid, self._address, id), exposed
571
572    def join(self, timeout=None):
573        '''
574        Join the manager process (if it has been spawned)
575        '''
576        self._process.join(timeout)
577
578    def _debug_info(self):
579        '''
580        Return some info about the servers shared objects and connections
581        '''
582        conn = self._Client(self._address, authkey=self._authkey)
583        try:
584            return dispatch(conn, None, 'debug_info')
585        finally:
586            conn.close()
587
588    def _number_of_objects(self):
589        '''
590        Return the number of shared objects
591        '''
592        conn = self._Client(self._address, authkey=self._authkey)
593        try:
594            return dispatch(conn, None, 'number_of_objects')
595        finally:
596            conn.close()
597
598    def __enter__(self):
599        return self
600
601    def __exit__(self, exc_type, exc_val, exc_tb):
602        self.shutdown()
603
604    @staticmethod
605    def _finalize_manager(process, address, authkey, state, _Client):
606        '''
607        Shutdown the manager process; will be registered as a finalizer
608        '''
609        if process.is_alive():
610            util.info('sending shutdown message to manager')
611            try:
612                conn = _Client(address, authkey=authkey)
613                try:
614                    dispatch(conn, None, 'shutdown')
615                finally:
616                    conn.close()
617            except Exception:
618                pass
619
620            process.join(timeout=0.2)
621            if process.is_alive():
622                util.info('manager still alive')
623                if hasattr(process, 'terminate'):
624                    util.info('trying to `terminate()` manager process')
625                    process.terminate()
626                    process.join(timeout=0.1)
627                    if process.is_alive():
628                        util.info('manager still alive after terminate')
629
630        state.value = State.SHUTDOWN
631        try:
632            del BaseProxy._address_to_local[address]
633        except KeyError:
634            pass
635
636    address = property(lambda self: self._address)
637
638    @classmethod
639    def register(cls, typeid, callable=None, proxytype=None, exposed=None,
640                 method_to_typeid=None, create_method=True):
641        '''
642        Register a typeid with the manager type
643        '''
644        if '_registry' not in cls.__dict__:
645            cls._registry = cls._registry.copy()
646
647        if proxytype is None:
648            proxytype = AutoProxy
649
650        exposed = exposed or getattr(proxytype, '_exposed_', None)
651
652        method_to_typeid = method_to_typeid or \
653                           getattr(proxytype, '_method_to_typeid_', None)
654
655        if method_to_typeid:
656            for key, value in method_to_typeid.items():
657                assert type(key) is str, '%r is not a string' % key
658                assert type(value) is str, '%r is not a string' % value
659
660        cls._registry[typeid] = (
661            callable, exposed, method_to_typeid, proxytype
662            )
663
664        if create_method:
665            def temp(self, *args, **kwds):
666                util.debug('requesting creation of a shared %r object', typeid)
667                token, exp = self._create(typeid, *args, **kwds)
668                proxy = proxytype(
669                    token, self._serializer, manager=self,
670                    authkey=self._authkey, exposed=exp
671                    )
672                conn = self._Client(token.address, authkey=self._authkey)
673                dispatch(conn, None, 'decref', (token.id,))
674                return proxy
675            temp.__name__ = typeid
676            setattr(cls, typeid, temp)
677
678#
679# Subclass of set which get cleared after a fork
680#
681
682class ProcessLocalSet(set):
683    def __init__(self):
684        util.register_after_fork(self, lambda obj: obj.clear())
685    def __reduce__(self):
686        return type(self), ()
687
688#
689# Definition of BaseProxy
690#
691
692class BaseProxy(object):
693    '''
694    A base for proxies of shared objects
695    '''
696    _address_to_local = {}
697    _mutex = util.ForkAwareThreadLock()
698
699    def __init__(self, token, serializer, manager=None,
700                 authkey=None, exposed=None, incref=True):
701        BaseProxy._mutex.acquire()
702        try:
703            tls_idset = BaseProxy._address_to_local.get(token.address, None)
704            if tls_idset is None:
705                tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
706                BaseProxy._address_to_local[token.address] = tls_idset
707        finally:
708            BaseProxy._mutex.release()
709
710        # self._tls is used to record the connection used by this
711        # thread to communicate with the manager at token.address
712        self._tls = tls_idset[0]
713
714        # self._idset is used to record the identities of all shared
715        # objects for which the current process owns references and
716        # which are in the manager at token.address
717        self._idset = tls_idset[1]
718
719        self._token = token
720        self._id = self._token.id
721        self._manager = manager
722        self._serializer = serializer
723        self._Client = listener_client[serializer][1]
724
725        if authkey is not None:
726            self._authkey = AuthenticationString(authkey)
727        elif self._manager is not None:
728            self._authkey = self._manager._authkey
729        else:
730            self._authkey = current_process().authkey
731
732        if incref:
733            self._incref()
734
735        util.register_after_fork(self, BaseProxy._after_fork)
736
737    def _connect(self):
738        util.debug('making connection to manager')
739        name = current_process().name
740        if threading.current_thread().name != 'MainThread':
741            name += '|' + threading.current_thread().name
742        conn = self._Client(self._token.address, authkey=self._authkey)
743        dispatch(conn, None, 'accept_connection', (name,))
744        self._tls.connection = conn
745
746    def _callmethod(self, methodname, args=(), kwds={}):
747        '''
748        Try to call a method of the referrent and return a copy of the result
749        '''
750        try:
751            conn = self._tls.connection
752        except AttributeError:
753            util.debug('thread %r does not own a connection',
754                       threading.current_thread().name)
755            self._connect()
756            conn = self._tls.connection
757
758        conn.send((self._id, methodname, args, kwds))
759        kind, result = conn.recv()
760
761        if kind == '#RETURN':
762            return result
763        elif kind == '#PROXY':
764            exposed, token = result
765            proxytype = self._manager._registry[token.typeid][-1]
766            token.address = self._token.address
767            proxy = proxytype(
768                token, self._serializer, manager=self._manager,
769                authkey=self._authkey, exposed=exposed
770                )
771            conn = self._Client(token.address, authkey=self._authkey)
772            dispatch(conn, None, 'decref', (token.id,))
773            return proxy
774        raise convert_to_error(kind, result)
775
776    def _getvalue(self):
777        '''
778        Get a copy of the value of the referent
779        '''
780        return self._callmethod('#GETVALUE')
781
782    def _incref(self):
783        conn = self._Client(self._token.address, authkey=self._authkey)
784        dispatch(conn, None, 'incref', (self._id,))
785        util.debug('INCREF %r', self._token.id)
786
787        self._idset.add(self._id)
788
789        state = self._manager and self._manager._state
790
791        self._close = util.Finalize(
792            self, BaseProxy._decref,
793            args=(self._token, self._authkey, state,
794                  self._tls, self._idset, self._Client),
795            exitpriority=10
796            )
797
798    @staticmethod
799    def _decref(token, authkey, state, tls, idset, _Client):
800        idset.discard(token.id)
801
802        # check whether manager is still alive
803        if state is None or state.value == State.STARTED:
804            # tell manager this process no longer cares about referent
805            try:
806                util.debug('DECREF %r', token.id)
807                conn = _Client(token.address, authkey=authkey)
808                dispatch(conn, None, 'decref', (token.id,))
809            except Exception, e:
810                util.debug('... decref failed %s', e)
811
812        else:
813            util.debug('DECREF %r -- manager already shutdown', token.id)
814
815        # check whether we can close this thread's connection because
816        # the process owns no more references to objects for this manager
817        if not idset and hasattr(tls, 'connection'):
818            util.debug('thread %r has no more proxies so closing conn',
819                       threading.current_thread().name)
820            tls.connection.close()
821            del tls.connection
822
823    def _after_fork(self):
824        self._manager = None
825        try:
826            self._incref()
827        except Exception, e:
828            # the proxy may just be for a manager which has shutdown
829            util.info('incref failed: %s' % e)
830
831    def __reduce__(self):
832        kwds = {}
833        if Popen.thread_is_spawning():
834            kwds['authkey'] = self._authkey
835
836        if getattr(self, '_isauto', False):
837            kwds['exposed'] = self._exposed_
838            return (RebuildProxy,
839                    (AutoProxy, self._token, self._serializer, kwds))
840        else:
841            return (RebuildProxy,
842                    (type(self), self._token, self._serializer, kwds))
843
844    def __deepcopy__(self, memo):
845        return self._getvalue()
846
847    def __repr__(self):
848        return '<%s object, typeid %r at %s>' % \
849               (type(self).__name__, self._token.typeid, '0x%x' % id(self))
850
851    def __str__(self):
852        '''
853        Return representation of the referent (or a fall-back if that fails)
854        '''
855        try:
856            return self._callmethod('__repr__')
857        except Exception:
858            return repr(self)[:-1] + "; '__str__()' failed>"
859
860#
861# Function used for unpickling
862#
863
864def RebuildProxy(func, token, serializer, kwds):
865    '''
866    Function used for unpickling proxy objects.
867
868    If possible the shared object is returned, or otherwise a proxy for it.
869    '''
870    server = getattr(current_process(), '_manager_server', None)
871
872    if server and server.address == token.address:
873        return server.id_to_obj[token.id][0]
874    else:
875        incref = (
876            kwds.pop('incref', True) and
877            not getattr(current_process(), '_inheriting', False)
878            )
879        return func(token, serializer, incref=incref, **kwds)
880
881#
882# Functions to create proxies and proxy types
883#
884
885def MakeProxyType(name, exposed, _cache={}):
886    '''
887    Return a proxy type whose methods are given by `exposed`
888    '''
889    exposed = tuple(exposed)
890    try:
891        return _cache[(name, exposed)]
892    except KeyError:
893        pass
894
895    dic = {}
896
897    for meth in exposed:
898        exec '''def %s(self, *args, **kwds):
899        return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
900
901    ProxyType = type(name, (BaseProxy,), dic)
902    ProxyType._exposed_ = exposed
903    _cache[(name, exposed)] = ProxyType
904    return ProxyType
905
906
907def AutoProxy(token, serializer, manager=None, authkey=None,
908              exposed=None, incref=True):
909    '''
910    Return an auto-proxy for `token`
911    '''
912    _Client = listener_client[serializer][1]
913
914    if exposed is None:
915        conn = _Client(token.address, authkey=authkey)
916        try:
917            exposed = dispatch(conn, None, 'get_methods', (token,))
918        finally:
919            conn.close()
920
921    if authkey is None and manager is not None:
922        authkey = manager._authkey
923    if authkey is None:
924        authkey = current_process().authkey
925
926    ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
927    proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
928                      incref=incref)
929    proxy._isauto = True
930    return proxy
931
932#
933# Types/callables which we will register with SyncManager
934#
935
936class Namespace(object):
937    def __init__(self, **kwds):
938        self.__dict__.update(kwds)
939    def __repr__(self):
940        items = self.__dict__.items()
941        temp = []
942        for name, value in items:
943            if not name.startswith('_'):
944                temp.append('%s=%r' % (name, value))
945        temp.sort()
946        return 'Namespace(%s)' % str.join(', ', temp)
947
948class Value(object):
949    def __init__(self, typecode, value, lock=True):
950        self._typecode = typecode
951        self._value = value
952    def get(self):
953        return self._value
954    def set(self, value):
955        self._value = value
956    def __repr__(self):
957        return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
958    value = property(get, set)
959
960def Array(typecode, sequence, lock=True):
961    return array.array(typecode, sequence)
962
963#
964# Proxy types used by SyncManager
965#
966
967class IteratorProxy(BaseProxy):
968    # XXX remove methods for Py3.0 and Py2.6
969    _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
970    def __iter__(self):
971        return self
972    def __next__(self, *args):
973        return self._callmethod('__next__', args)
974    def next(self, *args):
975        return self._callmethod('next', args)
976    def send(self, *args):
977        return self._callmethod('send', args)
978    def throw(self, *args):
979        return self._callmethod('throw', args)
980    def close(self, *args):
981        return self._callmethod('close', args)
982
983
984class AcquirerProxy(BaseProxy):
985    _exposed_ = ('acquire', 'release')
986    def acquire(self, blocking=True):
987        return self._callmethod('acquire', (blocking,))
988    def release(self):
989        return self._callmethod('release')
990    def __enter__(self):
991        return self._callmethod('acquire')
992    def __exit__(self, exc_type, exc_val, exc_tb):
993        return self._callmethod('release')
994
995
996class ConditionProxy(AcquirerProxy):
997    # XXX will Condition.notfyAll() name be available in Py3.0?
998    _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
999    def wait(self, timeout=None):
1000        return self._callmethod('wait', (timeout,))
1001    def notify(self):
1002        return self._callmethod('notify')
1003    def notify_all(self):
1004        return self._callmethod('notify_all')
1005
1006class EventProxy(BaseProxy):
1007    _exposed_ = ('is_set', 'set', 'clear', 'wait')
1008    def is_set(self):
1009        return self._callmethod('is_set')
1010    def set(self):
1011        return self._callmethod('set')
1012    def clear(self):
1013        return self._callmethod('clear')
1014    def wait(self, timeout=None):
1015        return self._callmethod('wait', (timeout,))
1016
1017class NamespaceProxy(BaseProxy):
1018    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1019    def __getattr__(self, key):
1020        if key[0] == '_':
1021            return object.__getattribute__(self, key)
1022        callmethod = object.__getattribute__(self, '_callmethod')
1023        return callmethod('__getattribute__', (key,))
1024    def __setattr__(self, key, value):
1025        if key[0] == '_':
1026            return object.__setattr__(self, key, value)
1027        callmethod = object.__getattribute__(self, '_callmethod')
1028        return callmethod('__setattr__', (key, value))
1029    def __delattr__(self, key):
1030        if key[0] == '_':
1031            return object.__delattr__(self, key)
1032        callmethod = object.__getattribute__(self, '_callmethod')
1033        return callmethod('__delattr__', (key,))
1034
1035
1036class ValueProxy(BaseProxy):
1037    _exposed_ = ('get', 'set')
1038    def get(self):
1039        return self._callmethod('get')
1040    def set(self, value):
1041        return self._callmethod('set', (value,))
1042    value = property(get, set)
1043
1044
1045BaseListProxy = MakeProxyType('BaseListProxy', (
1046    '__add__', '__contains__', '__delitem__', '__delslice__',
1047    '__getitem__', '__getslice__', '__len__', '__mul__',
1048    '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1049    'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1050    'reverse', 'sort', '__imul__'
1051    ))                  # XXX __getslice__ and __setslice__ unneeded in Py3.0
1052class ListProxy(BaseListProxy):
1053    def __iadd__(self, value):
1054        self._callmethod('extend', (value,))
1055        return self
1056    def __imul__(self, value):
1057        self._callmethod('__imul__', (value,))
1058        return self
1059
1060
1061DictProxy = MakeProxyType('DictProxy', (
1062    '__contains__', '__delitem__', '__getitem__', '__len__',
1063    '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1064    'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1065    ))
1066
1067
1068ArrayProxy = MakeProxyType('ArrayProxy', (
1069    '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1070    ))                  # XXX __getslice__ and __setslice__ unneeded in Py3.0
1071
1072
1073PoolProxy = MakeProxyType('PoolProxy', (
1074    'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1075    'map', 'map_async', 'terminate'
1076    ))
1077PoolProxy._method_to_typeid_ = {
1078    'apply_async': 'AsyncResult',
1079    'map_async': 'AsyncResult',
1080    'imap': 'Iterator',
1081    'imap_unordered': 'Iterator'
1082    }
1083
1084#
1085# Definition of SyncManager
1086#
1087
1088class SyncManager(BaseManager):
1089    '''
1090    Subclass of `BaseManager` which supports a number of shared object types.
1091
1092    The types registered are those intended for the synchronization
1093    of threads, plus `dict`, `list` and `Namespace`.
1094
1095    The `multiprocessing.Manager()` function creates started instances of
1096    this class.
1097    '''
1098
1099SyncManager.register('Queue', Queue.Queue)
1100SyncManager.register('JoinableQueue', Queue.Queue)
1101SyncManager.register('Event', threading.Event, EventProxy)
1102SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1103SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1104SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1105SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1106                     AcquirerProxy)
1107SyncManager.register('Condition', threading.Condition, ConditionProxy)
1108SyncManager.register('Pool', Pool, PoolProxy)
1109SyncManager.register('list', list, ListProxy)
1110SyncManager.register('dict', dict, DictProxy)
1111SyncManager.register('Value', Value, ValueProxy)
1112SyncManager.register('Array', Array, ArrayProxy)
1113SyncManager.register('Namespace', Namespace, NamespaceProxy)
1114
1115# types returned by methods of PoolProxy
1116SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1117SyncManager.register('AsyncResult', create_method=False)
1118