• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Module providing manager classes for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk
8# Licensed to PSF under a Contributor Agreement.
9#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
17import sys
18import threading
19import signal
20import array
21import queue
22import time
23import types
24import os
25from os import getpid
26
27from traceback import format_exc
28
29from . import connection
30from .context import reduction, get_spawning_popen, ProcessError
31from . import pool
32from . import process
33from . import util
34from . import get_context
35try:
36    from . import shared_memory
37except ImportError:
38    HAS_SHMEM = False
39else:
40    HAS_SHMEM = True
41    __all__.append('SharedMemoryManager')
42
43#
44# Register some things for pickling
45#
46
47def reduce_array(a):
48    return array.array, (a.typecode, a.tobytes())
49reduction.register(array.array, reduce_array)
50
51view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
52if view_types[0] is not list:       # only needed in Py3.0
53    def rebuild_as_list(obj):
54        return list, (list(obj),)
55    for view_type in view_types:
56        reduction.register(view_type, rebuild_as_list)
57
58#
59# Type for identifying shared objects
60#
61
62class Token(object):
63    '''
64    Type to uniquely identify a shared object
65    '''
66    __slots__ = ('typeid', 'address', 'id')
67
68    def __init__(self, typeid, address, id):
69        (self.typeid, self.address, self.id) = (typeid, address, id)
70
71    def __getstate__(self):
72        return (self.typeid, self.address, self.id)
73
74    def __setstate__(self, state):
75        (self.typeid, self.address, self.id) = state
76
77    def __repr__(self):
78        return '%s(typeid=%r, address=%r, id=%r)' % \
79               (self.__class__.__name__, self.typeid, self.address, self.id)
80
81#
82# Function for communication with a manager's server process
83#
84
85def dispatch(c, id, methodname, args=(), kwds={}):
86    '''
87    Send a message to manager using connection `c` and return response
88    '''
89    c.send((id, methodname, args, kwds))
90    kind, result = c.recv()
91    if kind == '#RETURN':
92        return result
93    raise convert_to_error(kind, result)
94
95def convert_to_error(kind, result):
96    if kind == '#ERROR':
97        return result
98    elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
99        if not isinstance(result, str):
100            raise TypeError(
101                "Result {0!r} (kind '{1}') type is {2}, not str".format(
102                    result, kind, type(result)))
103        if kind == '#UNSERIALIZABLE':
104            return RemoteError('Unserializable message: %s\n' % result)
105        else:
106            return RemoteError(result)
107    else:
108        return ValueError('Unrecognized message type {!r}'.format(kind))
109
110class RemoteError(Exception):
111    def __str__(self):
112        return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
113
114#
115# Functions for finding the method names of an object
116#
117
118def all_methods(obj):
119    '''
120    Return a list of names of methods of `obj`
121    '''
122    temp = []
123    for name in dir(obj):
124        func = getattr(obj, name)
125        if callable(func):
126            temp.append(name)
127    return temp
128
129def public_methods(obj):
130    '''
131    Return a list of names of methods of `obj` which do not start with '_'
132    '''
133    return [name for name in all_methods(obj) if name[0] != '_']
134
135#
136# Server which is run in a process controlled by a manager
137#
138
139class Server(object):
140    '''
141    Server class which runs in a process controlled by a manager object
142    '''
143    public = ['shutdown', 'create', 'accept_connection', 'get_methods',
144              'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
145
146    def __init__(self, registry, address, authkey, serializer):
147        if not isinstance(authkey, bytes):
148            raise TypeError(
149                "Authkey {0!r} is type {1!s}, not bytes".format(
150                    authkey, type(authkey)))
151        self.registry = registry
152        self.authkey = process.AuthenticationString(authkey)
153        Listener, Client = listener_client[serializer]
154
155        # do authentication later
156        self.listener = Listener(address=address, backlog=16)
157        self.address = self.listener.address
158
159        self.id_to_obj = {'0': (None, ())}
160        self.id_to_refcount = {}
161        self.id_to_local_proxy_obj = {}
162        self.mutex = threading.Lock()
163
164    def serve_forever(self):
165        '''
166        Run the server forever
167        '''
168        self.stop_event = threading.Event()
169        process.current_process()._manager_server = self
170        try:
171            accepter = threading.Thread(target=self.accepter)
172            accepter.daemon = True
173            accepter.start()
174            try:
175                while not self.stop_event.is_set():
176                    self.stop_event.wait(1)
177            except (KeyboardInterrupt, SystemExit):
178                pass
179        finally:
180            if sys.stdout != sys.__stdout__: # what about stderr?
181                util.debug('resetting stdout, stderr')
182                sys.stdout = sys.__stdout__
183                sys.stderr = sys.__stderr__
184            sys.exit(0)
185
186    def accepter(self):
187        while True:
188            try:
189                c = self.listener.accept()
190            except OSError:
191                continue
192            t = threading.Thread(target=self.handle_request, args=(c,))
193            t.daemon = True
194            t.start()
195
196    def _handle_request(self, c):
197        request = None
198        try:
199            connection.deliver_challenge(c, self.authkey)
200            connection.answer_challenge(c, self.authkey)
201            request = c.recv()
202            ignore, funcname, args, kwds = request
203            assert funcname in self.public, '%r unrecognized' % funcname
204            func = getattr(self, funcname)
205        except Exception:
206            msg = ('#TRACEBACK', format_exc())
207        else:
208            try:
209                result = func(c, *args, **kwds)
210            except Exception:
211                msg = ('#TRACEBACK', format_exc())
212            else:
213                msg = ('#RETURN', result)
214
215        try:
216            c.send(msg)
217        except Exception as e:
218            try:
219                c.send(('#TRACEBACK', format_exc()))
220            except Exception:
221                pass
222            util.info('Failure to send message: %r', msg)
223            util.info(' ... request was %r', request)
224            util.info(' ... exception was %r', e)
225
226    def handle_request(self, conn):
227        '''
228        Handle a new connection
229        '''
230        try:
231            self._handle_request(conn)
232        except SystemExit:
233            # Server.serve_client() calls sys.exit(0) on EOF
234            pass
235        finally:
236            conn.close()
237
238    def serve_client(self, conn):
239        '''
240        Handle requests from the proxies in a particular process/thread
241        '''
242        util.debug('starting server thread to service %r',
243                   threading.current_thread().name)
244
245        recv = conn.recv
246        send = conn.send
247        id_to_obj = self.id_to_obj
248
249        while not self.stop_event.is_set():
250
251            try:
252                methodname = obj = None
253                request = recv()
254                ident, methodname, args, kwds = request
255                try:
256                    obj, exposed, gettypeid = id_to_obj[ident]
257                except KeyError as ke:
258                    try:
259                        obj, exposed, gettypeid = \
260                            self.id_to_local_proxy_obj[ident]
261                    except KeyError:
262                        raise ke
263
264                if methodname not in exposed:
265                    raise AttributeError(
266                        'method %r of %r object is not in exposed=%r' %
267                        (methodname, type(obj), exposed)
268                        )
269
270                function = getattr(obj, methodname)
271
272                try:
273                    res = function(*args, **kwds)
274                except Exception as e:
275                    msg = ('#ERROR', e)
276                else:
277                    typeid = gettypeid and gettypeid.get(methodname, None)
278                    if typeid:
279                        rident, rexposed = self.create(conn, typeid, res)
280                        token = Token(typeid, self.address, rident)
281                        msg = ('#PROXY', (rexposed, token))
282                    else:
283                        msg = ('#RETURN', res)
284
285            except AttributeError:
286                if methodname is None:
287                    msg = ('#TRACEBACK', format_exc())
288                else:
289                    try:
290                        fallback_func = self.fallback_mapping[methodname]
291                        result = fallback_func(
292                            self, conn, ident, obj, *args, **kwds
293                            )
294                        msg = ('#RETURN', result)
295                    except Exception:
296                        msg = ('#TRACEBACK', format_exc())
297
298            except EOFError:
299                util.debug('got EOF -- exiting thread serving %r',
300                           threading.current_thread().name)
301                sys.exit(0)
302
303            except Exception:
304                msg = ('#TRACEBACK', format_exc())
305
306            try:
307                try:
308                    send(msg)
309                except Exception:
310                    send(('#UNSERIALIZABLE', format_exc()))
311            except Exception as e:
312                util.info('exception in thread serving %r',
313                        threading.current_thread().name)
314                util.info(' ... message was %r', msg)
315                util.info(' ... exception was %r', e)
316                conn.close()
317                sys.exit(1)
318
319    def fallback_getvalue(self, conn, ident, obj):
320        return obj
321
322    def fallback_str(self, conn, ident, obj):
323        return str(obj)
324
325    def fallback_repr(self, conn, ident, obj):
326        return repr(obj)
327
328    fallback_mapping = {
329        '__str__':fallback_str,
330        '__repr__':fallback_repr,
331        '#GETVALUE':fallback_getvalue
332        }
333
334    def dummy(self, c):
335        pass
336
337    def debug_info(self, c):
338        '''
339        Return some info --- useful to spot problems with refcounting
340        '''
341        # Perhaps include debug info about 'c'?
342        with self.mutex:
343            result = []
344            keys = list(self.id_to_refcount.keys())
345            keys.sort()
346            for ident in keys:
347                if ident != '0':
348                    result.append('  %s:       refcount=%s\n    %s' %
349                                  (ident, self.id_to_refcount[ident],
350                                   str(self.id_to_obj[ident][0])[:75]))
351            return '\n'.join(result)
352
353    def number_of_objects(self, c):
354        '''
355        Number of shared objects
356        '''
357        # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
358        return len(self.id_to_refcount)
359
360    def shutdown(self, c):
361        '''
362        Shutdown this process
363        '''
364        try:
365            util.debug('manager received shutdown message')
366            c.send(('#RETURN', None))
367        except:
368            import traceback
369            traceback.print_exc()
370        finally:
371            self.stop_event.set()
372
373    def create(self, c, typeid, /, *args, **kwds):
374        '''
375        Create a new shared object and return its id
376        '''
377        with self.mutex:
378            callable, exposed, method_to_typeid, proxytype = \
379                      self.registry[typeid]
380
381            if callable is None:
382                if kwds or (len(args) != 1):
383                    raise ValueError(
384                        "Without callable, must have one non-keyword argument")
385                obj = args[0]
386            else:
387                obj = callable(*args, **kwds)
388
389            if exposed is None:
390                exposed = public_methods(obj)
391            if method_to_typeid is not None:
392                if not isinstance(method_to_typeid, dict):
393                    raise TypeError(
394                        "Method_to_typeid {0!r}: type {1!s}, not dict".format(
395                            method_to_typeid, type(method_to_typeid)))
396                exposed = list(exposed) + list(method_to_typeid)
397
398            ident = '%x' % id(obj)  # convert to string because xmlrpclib
399                                    # only has 32 bit signed integers
400            util.debug('%r callable returned object with id %r', typeid, ident)
401
402            self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
403            if ident not in self.id_to_refcount:
404                self.id_to_refcount[ident] = 0
405
406        self.incref(c, ident)
407        return ident, tuple(exposed)
408
409    def get_methods(self, c, token):
410        '''
411        Return the methods of the shared object indicated by token
412        '''
413        return tuple(self.id_to_obj[token.id][1])
414
415    def accept_connection(self, c, name):
416        '''
417        Spawn a new thread to serve this connection
418        '''
419        threading.current_thread().name = name
420        c.send(('#RETURN', None))
421        self.serve_client(c)
422
423    def incref(self, c, ident):
424        with self.mutex:
425            try:
426                self.id_to_refcount[ident] += 1
427            except KeyError as ke:
428                # If no external references exist but an internal (to the
429                # manager) still does and a new external reference is created
430                # from it, restore the manager's tracking of it from the
431                # previously stashed internal ref.
432                if ident in self.id_to_local_proxy_obj:
433                    self.id_to_refcount[ident] = 1
434                    self.id_to_obj[ident] = \
435                        self.id_to_local_proxy_obj[ident]
436                    obj, exposed, gettypeid = self.id_to_obj[ident]
437                    util.debug('Server re-enabled tracking & INCREF %r', ident)
438                else:
439                    raise ke
440
441    def decref(self, c, ident):
442        if ident not in self.id_to_refcount and \
443            ident in self.id_to_local_proxy_obj:
444            util.debug('Server DECREF skipping %r', ident)
445            return
446
447        with self.mutex:
448            if self.id_to_refcount[ident] <= 0:
449                raise AssertionError(
450                    "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
451                        ident, self.id_to_obj[ident],
452                        self.id_to_refcount[ident]))
453            self.id_to_refcount[ident] -= 1
454            if self.id_to_refcount[ident] == 0:
455                del self.id_to_refcount[ident]
456
457        if ident not in self.id_to_refcount:
458            # Two-step process in case the object turns out to contain other
459            # proxy objects (e.g. a managed list of managed lists).
460            # Otherwise, deleting self.id_to_obj[ident] would trigger the
461            # deleting of the stored value (another managed object) which would
462            # in turn attempt to acquire the mutex that is already held here.
463            self.id_to_obj[ident] = (None, (), None)  # thread-safe
464            util.debug('disposing of obj with id %r', ident)
465            with self.mutex:
466                del self.id_to_obj[ident]
467
468
469#
470# Class to represent state of a manager
471#
472
473class State(object):
474    __slots__ = ['value']
475    INITIAL = 0
476    STARTED = 1
477    SHUTDOWN = 2
478
479#
480# Mapping from serializer name to Listener and Client types
481#
482
483listener_client = {
484    'pickle' : (connection.Listener, connection.Client),
485    'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
486    }
487
488#
489# Definition of BaseManager
490#
491
492class BaseManager(object):
493    '''
494    Base class for managers
495    '''
496    _registry = {}
497    _Server = Server
498
499    def __init__(self, address=None, authkey=None, serializer='pickle',
500                 ctx=None):
501        if authkey is None:
502            authkey = process.current_process().authkey
503        self._address = address     # XXX not final address if eg ('', 0)
504        self._authkey = process.AuthenticationString(authkey)
505        self._state = State()
506        self._state.value = State.INITIAL
507        self._serializer = serializer
508        self._Listener, self._Client = listener_client[serializer]
509        self._ctx = ctx or get_context()
510
511    def get_server(self):
512        '''
513        Return server object with serve_forever() method and address attribute
514        '''
515        if self._state.value != State.INITIAL:
516            if self._state.value == State.STARTED:
517                raise ProcessError("Already started server")
518            elif self._state.value == State.SHUTDOWN:
519                raise ProcessError("Manager has shut down")
520            else:
521                raise ProcessError(
522                    "Unknown state {!r}".format(self._state.value))
523        return Server(self._registry, self._address,
524                      self._authkey, self._serializer)
525
526    def connect(self):
527        '''
528        Connect manager object to the server process
529        '''
530        Listener, Client = listener_client[self._serializer]
531        conn = Client(self._address, authkey=self._authkey)
532        dispatch(conn, None, 'dummy')
533        self._state.value = State.STARTED
534
535    def start(self, initializer=None, initargs=()):
536        '''
537        Spawn a server process for this manager object
538        '''
539        if self._state.value != State.INITIAL:
540            if self._state.value == State.STARTED:
541                raise ProcessError("Already started server")
542            elif self._state.value == State.SHUTDOWN:
543                raise ProcessError("Manager has shut down")
544            else:
545                raise ProcessError(
546                    "Unknown state {!r}".format(self._state.value))
547
548        if initializer is not None and not callable(initializer):
549            raise TypeError('initializer must be a callable')
550
551        # pipe over which we will retrieve address of server
552        reader, writer = connection.Pipe(duplex=False)
553
554        # spawn process which runs a server
555        self._process = self._ctx.Process(
556            target=type(self)._run_server,
557            args=(self._registry, self._address, self._authkey,
558                  self._serializer, writer, initializer, initargs),
559            )
560        ident = ':'.join(str(i) for i in self._process._identity)
561        self._process.name = type(self).__name__  + '-' + ident
562        self._process.start()
563
564        # get address of server
565        writer.close()
566        self._address = reader.recv()
567        reader.close()
568
569        # register a finalizer
570        self._state.value = State.STARTED
571        self.shutdown = util.Finalize(
572            self, type(self)._finalize_manager,
573            args=(self._process, self._address, self._authkey,
574                  self._state, self._Client),
575            exitpriority=0
576            )
577
578    @classmethod
579    def _run_server(cls, registry, address, authkey, serializer, writer,
580                    initializer=None, initargs=()):
581        '''
582        Create a server, report its address and run it
583        '''
584        # bpo-36368: protect server process from KeyboardInterrupt signals
585        signal.signal(signal.SIGINT, signal.SIG_IGN)
586
587        if initializer is not None:
588            initializer(*initargs)
589
590        # create server
591        server = cls._Server(registry, address, authkey, serializer)
592
593        # inform parent process of the server's address
594        writer.send(server.address)
595        writer.close()
596
597        # run the manager
598        util.info('manager serving at %r', server.address)
599        server.serve_forever()
600
601    def _create(self, typeid, /, *args, **kwds):
602        '''
603        Create a new shared object; return the token and exposed tuple
604        '''
605        assert self._state.value == State.STARTED, 'server not yet started'
606        conn = self._Client(self._address, authkey=self._authkey)
607        try:
608            id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
609        finally:
610            conn.close()
611        return Token(typeid, self._address, id), exposed
612
613    def join(self, timeout=None):
614        '''
615        Join the manager process (if it has been spawned)
616        '''
617        if self._process is not None:
618            self._process.join(timeout)
619            if not self._process.is_alive():
620                self._process = None
621
622    def _debug_info(self):
623        '''
624        Return some info about the servers shared objects and connections
625        '''
626        conn = self._Client(self._address, authkey=self._authkey)
627        try:
628            return dispatch(conn, None, 'debug_info')
629        finally:
630            conn.close()
631
632    def _number_of_objects(self):
633        '''
634        Return the number of shared objects
635        '''
636        conn = self._Client(self._address, authkey=self._authkey)
637        try:
638            return dispatch(conn, None, 'number_of_objects')
639        finally:
640            conn.close()
641
642    def __enter__(self):
643        if self._state.value == State.INITIAL:
644            self.start()
645        if self._state.value != State.STARTED:
646            if self._state.value == State.INITIAL:
647                raise ProcessError("Unable to start server")
648            elif self._state.value == State.SHUTDOWN:
649                raise ProcessError("Manager has shut down")
650            else:
651                raise ProcessError(
652                    "Unknown state {!r}".format(self._state.value))
653        return self
654
655    def __exit__(self, exc_type, exc_val, exc_tb):
656        self.shutdown()
657
658    @staticmethod
659    def _finalize_manager(process, address, authkey, state, _Client):
660        '''
661        Shutdown the manager process; will be registered as a finalizer
662        '''
663        if process.is_alive():
664            util.info('sending shutdown message to manager')
665            try:
666                conn = _Client(address, authkey=authkey)
667                try:
668                    dispatch(conn, None, 'shutdown')
669                finally:
670                    conn.close()
671            except Exception:
672                pass
673
674            process.join(timeout=1.0)
675            if process.is_alive():
676                util.info('manager still alive')
677                if hasattr(process, 'terminate'):
678                    util.info('trying to `terminate()` manager process')
679                    process.terminate()
680                    process.join(timeout=0.1)
681                    if process.is_alive():
682                        util.info('manager still alive after terminate')
683
684        state.value = State.SHUTDOWN
685        try:
686            del BaseProxy._address_to_local[address]
687        except KeyError:
688            pass
689
690    @property
691    def address(self):
692        return self._address
693
694    @classmethod
695    def register(cls, typeid, callable=None, proxytype=None, exposed=None,
696                 method_to_typeid=None, create_method=True):
697        '''
698        Register a typeid with the manager type
699        '''
700        if '_registry' not in cls.__dict__:
701            cls._registry = cls._registry.copy()
702
703        if proxytype is None:
704            proxytype = AutoProxy
705
706        exposed = exposed or getattr(proxytype, '_exposed_', None)
707
708        method_to_typeid = method_to_typeid or \
709                           getattr(proxytype, '_method_to_typeid_', None)
710
711        if method_to_typeid:
712            for key, value in list(method_to_typeid.items()): # isinstance?
713                assert type(key) is str, '%r is not a string' % key
714                assert type(value) is str, '%r is not a string' % value
715
716        cls._registry[typeid] = (
717            callable, exposed, method_to_typeid, proxytype
718            )
719
720        if create_method:
721            def temp(self, /, *args, **kwds):
722                util.debug('requesting creation of a shared %r object', typeid)
723                token, exp = self._create(typeid, *args, **kwds)
724                proxy = proxytype(
725                    token, self._serializer, manager=self,
726                    authkey=self._authkey, exposed=exp
727                    )
728                conn = self._Client(token.address, authkey=self._authkey)
729                dispatch(conn, None, 'decref', (token.id,))
730                return proxy
731            temp.__name__ = typeid
732            setattr(cls, typeid, temp)
733
734#
735# Subclass of set which get cleared after a fork
736#
737
738class ProcessLocalSet(set):
739    def __init__(self):
740        util.register_after_fork(self, lambda obj: obj.clear())
741    def __reduce__(self):
742        return type(self), ()
743
744#
745# Definition of BaseProxy
746#
747
748class BaseProxy(object):
749    '''
750    A base for proxies of shared objects
751    '''
752    _address_to_local = {}
753    _mutex = util.ForkAwareThreadLock()
754
755    def __init__(self, token, serializer, manager=None,
756                 authkey=None, exposed=None, incref=True, manager_owned=False):
757        with BaseProxy._mutex:
758            tls_idset = BaseProxy._address_to_local.get(token.address, None)
759            if tls_idset is None:
760                tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
761                BaseProxy._address_to_local[token.address] = tls_idset
762
763        # self._tls is used to record the connection used by this
764        # thread to communicate with the manager at token.address
765        self._tls = tls_idset[0]
766
767        # self._idset is used to record the identities of all shared
768        # objects for which the current process owns references and
769        # which are in the manager at token.address
770        self._idset = tls_idset[1]
771
772        self._token = token
773        self._id = self._token.id
774        self._manager = manager
775        self._serializer = serializer
776        self._Client = listener_client[serializer][1]
777
778        # Should be set to True only when a proxy object is being created
779        # on the manager server; primary use case: nested proxy objects.
780        # RebuildProxy detects when a proxy is being created on the manager
781        # and sets this value appropriately.
782        self._owned_by_manager = manager_owned
783
784        if authkey is not None:
785            self._authkey = process.AuthenticationString(authkey)
786        elif self._manager is not None:
787            self._authkey = self._manager._authkey
788        else:
789            self._authkey = process.current_process().authkey
790
791        if incref:
792            self._incref()
793
794        util.register_after_fork(self, BaseProxy._after_fork)
795
796    def _connect(self):
797        util.debug('making connection to manager')
798        name = process.current_process().name
799        if threading.current_thread().name != 'MainThread':
800            name += '|' + threading.current_thread().name
801        conn = self._Client(self._token.address, authkey=self._authkey)
802        dispatch(conn, None, 'accept_connection', (name,))
803        self._tls.connection = conn
804
805    def _callmethod(self, methodname, args=(), kwds={}):
806        '''
807        Try to call a method of the referent and return a copy of the result
808        '''
809        try:
810            conn = self._tls.connection
811        except AttributeError:
812            util.debug('thread %r does not own a connection',
813                       threading.current_thread().name)
814            self._connect()
815            conn = self._tls.connection
816
817        conn.send((self._id, methodname, args, kwds))
818        kind, result = conn.recv()
819
820        if kind == '#RETURN':
821            return result
822        elif kind == '#PROXY':
823            exposed, token = result
824            proxytype = self._manager._registry[token.typeid][-1]
825            token.address = self._token.address
826            proxy = proxytype(
827                token, self._serializer, manager=self._manager,
828                authkey=self._authkey, exposed=exposed
829                )
830            conn = self._Client(token.address, authkey=self._authkey)
831            dispatch(conn, None, 'decref', (token.id,))
832            return proxy
833        raise convert_to_error(kind, result)
834
835    def _getvalue(self):
836        '''
837        Get a copy of the value of the referent
838        '''
839        return self._callmethod('#GETVALUE')
840
841    def _incref(self):
842        if self._owned_by_manager:
843            util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
844            return
845
846        conn = self._Client(self._token.address, authkey=self._authkey)
847        dispatch(conn, None, 'incref', (self._id,))
848        util.debug('INCREF %r', self._token.id)
849
850        self._idset.add(self._id)
851
852        state = self._manager and self._manager._state
853
854        self._close = util.Finalize(
855            self, BaseProxy._decref,
856            args=(self._token, self._authkey, state,
857                  self._tls, self._idset, self._Client),
858            exitpriority=10
859            )
860
861    @staticmethod
862    def _decref(token, authkey, state, tls, idset, _Client):
863        idset.discard(token.id)
864
865        # check whether manager is still alive
866        if state is None or state.value == State.STARTED:
867            # tell manager this process no longer cares about referent
868            try:
869                util.debug('DECREF %r', token.id)
870                conn = _Client(token.address, authkey=authkey)
871                dispatch(conn, None, 'decref', (token.id,))
872            except Exception as e:
873                util.debug('... decref failed %s', e)
874
875        else:
876            util.debug('DECREF %r -- manager already shutdown', token.id)
877
878        # check whether we can close this thread's connection because
879        # the process owns no more references to objects for this manager
880        if not idset and hasattr(tls, 'connection'):
881            util.debug('thread %r has no more proxies so closing conn',
882                       threading.current_thread().name)
883            tls.connection.close()
884            del tls.connection
885
886    def _after_fork(self):
887        self._manager = None
888        try:
889            self._incref()
890        except Exception as e:
891            # the proxy may just be for a manager which has shutdown
892            util.info('incref failed: %s' % e)
893
894    def __reduce__(self):
895        kwds = {}
896        if get_spawning_popen() is not None:
897            kwds['authkey'] = self._authkey
898
899        if getattr(self, '_isauto', False):
900            kwds['exposed'] = self._exposed_
901            return (RebuildProxy,
902                    (AutoProxy, self._token, self._serializer, kwds))
903        else:
904            return (RebuildProxy,
905                    (type(self), self._token, self._serializer, kwds))
906
907    def __deepcopy__(self, memo):
908        return self._getvalue()
909
910    def __repr__(self):
911        return '<%s object, typeid %r at %#x>' % \
912               (type(self).__name__, self._token.typeid, id(self))
913
914    def __str__(self):
915        '''
916        Return representation of the referent (or a fall-back if that fails)
917        '''
918        try:
919            return self._callmethod('__repr__')
920        except Exception:
921            return repr(self)[:-1] + "; '__str__()' failed>"
922
923#
924# Function used for unpickling
925#
926
927def RebuildProxy(func, token, serializer, kwds):
928    '''
929    Function used for unpickling proxy objects.
930    '''
931    server = getattr(process.current_process(), '_manager_server', None)
932    if server and server.address == token.address:
933        util.debug('Rebuild a proxy owned by manager, token=%r', token)
934        kwds['manager_owned'] = True
935        if token.id not in server.id_to_local_proxy_obj:
936            server.id_to_local_proxy_obj[token.id] = \
937                server.id_to_obj[token.id]
938    incref = (
939        kwds.pop('incref', True) and
940        not getattr(process.current_process(), '_inheriting', False)
941        )
942    return func(token, serializer, incref=incref, **kwds)
943
944#
945# Functions to create proxies and proxy types
946#
947
948def MakeProxyType(name, exposed, _cache={}):
949    '''
950    Return a proxy type whose methods are given by `exposed`
951    '''
952    exposed = tuple(exposed)
953    try:
954        return _cache[(name, exposed)]
955    except KeyError:
956        pass
957
958    dic = {}
959
960    for meth in exposed:
961        exec('''def %s(self, /, *args, **kwds):
962        return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
963
964    ProxyType = type(name, (BaseProxy,), dic)
965    ProxyType._exposed_ = exposed
966    _cache[(name, exposed)] = ProxyType
967    return ProxyType
968
969
970def AutoProxy(token, serializer, manager=None, authkey=None,
971              exposed=None, incref=True, manager_owned=False):
972    '''
973    Return an auto-proxy for `token`
974    '''
975    _Client = listener_client[serializer][1]
976
977    if exposed is None:
978        conn = _Client(token.address, authkey=authkey)
979        try:
980            exposed = dispatch(conn, None, 'get_methods', (token,))
981        finally:
982            conn.close()
983
984    if authkey is None and manager is not None:
985        authkey = manager._authkey
986    if authkey is None:
987        authkey = process.current_process().authkey
988
989    ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
990    proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
991                      incref=incref, manager_owned=manager_owned)
992    proxy._isauto = True
993    return proxy
994
995#
996# Types/callables which we will register with SyncManager
997#
998
999class Namespace(object):
1000    def __init__(self, /, **kwds):
1001        self.__dict__.update(kwds)
1002    def __repr__(self):
1003        items = list(self.__dict__.items())
1004        temp = []
1005        for name, value in items:
1006            if not name.startswith('_'):
1007                temp.append('%s=%r' % (name, value))
1008        temp.sort()
1009        return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
1010
1011class Value(object):
1012    def __init__(self, typecode, value, lock=True):
1013        self._typecode = typecode
1014        self._value = value
1015    def get(self):
1016        return self._value
1017    def set(self, value):
1018        self._value = value
1019    def __repr__(self):
1020        return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1021    value = property(get, set)
1022
1023def Array(typecode, sequence, lock=True):
1024    return array.array(typecode, sequence)
1025
1026#
1027# Proxy types used by SyncManager
1028#
1029
1030class IteratorProxy(BaseProxy):
1031    _exposed_ = ('__next__', 'send', 'throw', 'close')
1032    def __iter__(self):
1033        return self
1034    def __next__(self, *args):
1035        return self._callmethod('__next__', args)
1036    def send(self, *args):
1037        return self._callmethod('send', args)
1038    def throw(self, *args):
1039        return self._callmethod('throw', args)
1040    def close(self, *args):
1041        return self._callmethod('close', args)
1042
1043
1044class AcquirerProxy(BaseProxy):
1045    _exposed_ = ('acquire', 'release')
1046    def acquire(self, blocking=True, timeout=None):
1047        args = (blocking,) if timeout is None else (blocking, timeout)
1048        return self._callmethod('acquire', args)
1049    def release(self):
1050        return self._callmethod('release')
1051    def __enter__(self):
1052        return self._callmethod('acquire')
1053    def __exit__(self, exc_type, exc_val, exc_tb):
1054        return self._callmethod('release')
1055
1056
1057class ConditionProxy(AcquirerProxy):
1058    _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
1059    def wait(self, timeout=None):
1060        return self._callmethod('wait', (timeout,))
1061    def notify(self, n=1):
1062        return self._callmethod('notify', (n,))
1063    def notify_all(self):
1064        return self._callmethod('notify_all')
1065    def wait_for(self, predicate, timeout=None):
1066        result = predicate()
1067        if result:
1068            return result
1069        if timeout is not None:
1070            endtime = time.monotonic() + timeout
1071        else:
1072            endtime = None
1073            waittime = None
1074        while not result:
1075            if endtime is not None:
1076                waittime = endtime - time.monotonic()
1077                if waittime <= 0:
1078                    break
1079            self.wait(waittime)
1080            result = predicate()
1081        return result
1082
1083
1084class EventProxy(BaseProxy):
1085    _exposed_ = ('is_set', 'set', 'clear', 'wait')
1086    def is_set(self):
1087        return self._callmethod('is_set')
1088    def set(self):
1089        return self._callmethod('set')
1090    def clear(self):
1091        return self._callmethod('clear')
1092    def wait(self, timeout=None):
1093        return self._callmethod('wait', (timeout,))
1094
1095
1096class BarrierProxy(BaseProxy):
1097    _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1098    def wait(self, timeout=None):
1099        return self._callmethod('wait', (timeout,))
1100    def abort(self):
1101        return self._callmethod('abort')
1102    def reset(self):
1103        return self._callmethod('reset')
1104    @property
1105    def parties(self):
1106        return self._callmethod('__getattribute__', ('parties',))
1107    @property
1108    def n_waiting(self):
1109        return self._callmethod('__getattribute__', ('n_waiting',))
1110    @property
1111    def broken(self):
1112        return self._callmethod('__getattribute__', ('broken',))
1113
1114
1115class NamespaceProxy(BaseProxy):
1116    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1117    def __getattr__(self, key):
1118        if key[0] == '_':
1119            return object.__getattribute__(self, key)
1120        callmethod = object.__getattribute__(self, '_callmethod')
1121        return callmethod('__getattribute__', (key,))
1122    def __setattr__(self, key, value):
1123        if key[0] == '_':
1124            return object.__setattr__(self, key, value)
1125        callmethod = object.__getattribute__(self, '_callmethod')
1126        return callmethod('__setattr__', (key, value))
1127    def __delattr__(self, key):
1128        if key[0] == '_':
1129            return object.__delattr__(self, key)
1130        callmethod = object.__getattribute__(self, '_callmethod')
1131        return callmethod('__delattr__', (key,))
1132
1133
1134class ValueProxy(BaseProxy):
1135    _exposed_ = ('get', 'set')
1136    def get(self):
1137        return self._callmethod('get')
1138    def set(self, value):
1139        return self._callmethod('set', (value,))
1140    value = property(get, set)
1141
1142    __class_getitem__ = classmethod(types.GenericAlias)
1143
1144
1145BaseListProxy = MakeProxyType('BaseListProxy', (
1146    '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1147    '__mul__', '__reversed__', '__rmul__', '__setitem__',
1148    'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1149    'reverse', 'sort', '__imul__'
1150    ))
1151class ListProxy(BaseListProxy):
1152    def __iadd__(self, value):
1153        self._callmethod('extend', (value,))
1154        return self
1155    def __imul__(self, value):
1156        self._callmethod('__imul__', (value,))
1157        return self
1158
1159
1160DictProxy = MakeProxyType('DictProxy', (
1161    '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
1162    '__setitem__', 'clear', 'copy', 'get', 'items',
1163    'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1164    ))
1165DictProxy._method_to_typeid_ = {
1166    '__iter__': 'Iterator',
1167    }
1168
1169
1170ArrayProxy = MakeProxyType('ArrayProxy', (
1171    '__len__', '__getitem__', '__setitem__'
1172    ))
1173
1174
1175BasePoolProxy = MakeProxyType('PoolProxy', (
1176    'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1177    'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
1178    ))
1179BasePoolProxy._method_to_typeid_ = {
1180    'apply_async': 'AsyncResult',
1181    'map_async': 'AsyncResult',
1182    'starmap_async': 'AsyncResult',
1183    'imap': 'Iterator',
1184    'imap_unordered': 'Iterator'
1185    }
1186class PoolProxy(BasePoolProxy):
1187    def __enter__(self):
1188        return self
1189    def __exit__(self, exc_type, exc_val, exc_tb):
1190        self.terminate()
1191
1192#
1193# Definition of SyncManager
1194#
1195
1196class SyncManager(BaseManager):
1197    '''
1198    Subclass of `BaseManager` which supports a number of shared object types.
1199
1200    The types registered are those intended for the synchronization
1201    of threads, plus `dict`, `list` and `Namespace`.
1202
1203    The `multiprocessing.Manager()` function creates started instances of
1204    this class.
1205    '''
1206
1207SyncManager.register('Queue', queue.Queue)
1208SyncManager.register('JoinableQueue', queue.Queue)
1209SyncManager.register('Event', threading.Event, EventProxy)
1210SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1211SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1212SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1213SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1214                     AcquirerProxy)
1215SyncManager.register('Condition', threading.Condition, ConditionProxy)
1216SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
1217SyncManager.register('Pool', pool.Pool, PoolProxy)
1218SyncManager.register('list', list, ListProxy)
1219SyncManager.register('dict', dict, DictProxy)
1220SyncManager.register('Value', Value, ValueProxy)
1221SyncManager.register('Array', Array, ArrayProxy)
1222SyncManager.register('Namespace', Namespace, NamespaceProxy)
1223
1224# types returned by methods of PoolProxy
1225SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1226SyncManager.register('AsyncResult', create_method=False)
1227
1228#
1229# Definition of SharedMemoryManager and SharedMemoryServer
1230#
1231
1232if HAS_SHMEM:
1233    class _SharedMemoryTracker:
1234        "Manages one or more shared memory segments."
1235
1236        def __init__(self, name, segment_names=[]):
1237            self.shared_memory_context_name = name
1238            self.segment_names = segment_names
1239
1240        def register_segment(self, segment_name):
1241            "Adds the supplied shared memory block name to tracker."
1242            util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1243            self.segment_names.append(segment_name)
1244
1245        def destroy_segment(self, segment_name):
1246            """Calls unlink() on the shared memory block with the supplied name
1247            and removes it from the list of blocks being tracked."""
1248            util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1249            self.segment_names.remove(segment_name)
1250            segment = shared_memory.SharedMemory(segment_name)
1251            segment.close()
1252            segment.unlink()
1253
1254        def unlink(self):
1255            "Calls destroy_segment() on all tracked shared memory blocks."
1256            for segment_name in self.segment_names[:]:
1257                self.destroy_segment(segment_name)
1258
1259        def __del__(self):
1260            util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1261            self.unlink()
1262
1263        def __getstate__(self):
1264            return (self.shared_memory_context_name, self.segment_names)
1265
1266        def __setstate__(self, state):
1267            self.__init__(*state)
1268
1269
1270    class SharedMemoryServer(Server):
1271
1272        public = Server.public + \
1273                 ['track_segment', 'release_segment', 'list_segments']
1274
1275        def __init__(self, *args, **kwargs):
1276            Server.__init__(self, *args, **kwargs)
1277            address = self.address
1278            # The address of Linux abstract namespaces can be bytes
1279            if isinstance(address, bytes):
1280                address = os.fsdecode(address)
1281            self.shared_memory_context = \
1282                _SharedMemoryTracker(f"shm_{address}_{getpid()}")
1283            util.debug(f"SharedMemoryServer started by pid {getpid()}")
1284
1285        def create(self, c, typeid, /, *args, **kwargs):
1286            """Create a new distributed-shared object (not backed by a shared
1287            memory block) and return its id to be used in a Proxy Object."""
1288            # Unless set up as a shared proxy, don't make shared_memory_context
1289            # a standard part of kwargs.  This makes things easier for supplying
1290            # simple functions.
1291            if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1292                kwargs['shared_memory_context'] = self.shared_memory_context
1293            return Server.create(self, c, typeid, *args, **kwargs)
1294
1295        def shutdown(self, c):
1296            "Call unlink() on all tracked shared memory, terminate the Server."
1297            self.shared_memory_context.unlink()
1298            return Server.shutdown(self, c)
1299
1300        def track_segment(self, c, segment_name):
1301            "Adds the supplied shared memory block name to Server's tracker."
1302            self.shared_memory_context.register_segment(segment_name)
1303
1304        def release_segment(self, c, segment_name):
1305            """Calls unlink() on the shared memory block with the supplied name
1306            and removes it from the tracker instance inside the Server."""
1307            self.shared_memory_context.destroy_segment(segment_name)
1308
1309        def list_segments(self, c):
1310            """Returns a list of names of shared memory blocks that the Server
1311            is currently tracking."""
1312            return self.shared_memory_context.segment_names
1313
1314
1315    class SharedMemoryManager(BaseManager):
1316        """Like SyncManager but uses SharedMemoryServer instead of Server.
1317
1318        It provides methods for creating and returning SharedMemory instances
1319        and for creating a list-like object (ShareableList) backed by shared
1320        memory.  It also provides methods that create and return Proxy Objects
1321        that support synchronization across processes (i.e. multi-process-safe
1322        locks and semaphores).
1323        """
1324
1325        _Server = SharedMemoryServer
1326
1327        def __init__(self, *args, **kwargs):
1328            if os.name == "posix":
1329                # bpo-36867: Ensure the resource_tracker is running before
1330                # launching the manager process, so that concurrent
1331                # shared_memory manipulation both in the manager and in the
1332                # current process does not create two resource_tracker
1333                # processes.
1334                from . import resource_tracker
1335                resource_tracker.ensure_running()
1336            BaseManager.__init__(self, *args, **kwargs)
1337            util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1338
1339        def __del__(self):
1340            util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1341            pass
1342
1343        def get_server(self):
1344            'Better than monkeypatching for now; merge into Server ultimately'
1345            if self._state.value != State.INITIAL:
1346                if self._state.value == State.STARTED:
1347                    raise ProcessError("Already started SharedMemoryServer")
1348                elif self._state.value == State.SHUTDOWN:
1349                    raise ProcessError("SharedMemoryManager has shut down")
1350                else:
1351                    raise ProcessError(
1352                        "Unknown state {!r}".format(self._state.value))
1353            return self._Server(self._registry, self._address,
1354                                self._authkey, self._serializer)
1355
1356        def SharedMemory(self, size):
1357            """Returns a new SharedMemory instance with the specified size in
1358            bytes, to be tracked by the manager."""
1359            with self._Client(self._address, authkey=self._authkey) as conn:
1360                sms = shared_memory.SharedMemory(None, create=True, size=size)
1361                try:
1362                    dispatch(conn, None, 'track_segment', (sms.name,))
1363                except BaseException as e:
1364                    sms.unlink()
1365                    raise e
1366            return sms
1367
1368        def ShareableList(self, sequence):
1369            """Returns a new ShareableList instance populated with the values
1370            from the input sequence, to be tracked by the manager."""
1371            with self._Client(self._address, authkey=self._authkey) as conn:
1372                sl = shared_memory.ShareableList(sequence)
1373                try:
1374                    dispatch(conn, None, 'track_segment', (sl.shm.name,))
1375                except BaseException as e:
1376                    sl.shm.unlink()
1377                    raise e
1378            return sl
1379