• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Module providing manager classes for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk
8# Licensed to PSF under a Contributor Agreement.
9#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
17import sys
18import threading
19import signal
20import array
21import queue
22import time
23import types
24import os
25from os import getpid
26
27from traceback import format_exc
28
29from . import connection
30from .context import reduction, get_spawning_popen, ProcessError
31from . import pool
32from . import process
33from . import util
34from . import get_context
35try:
36    from . import shared_memory
37except ImportError:
38    HAS_SHMEM = False
39else:
40    HAS_SHMEM = True
41    __all__.append('SharedMemoryManager')
42
43#
44# Register some things for pickling
45#
46
47def reduce_array(a):
48    return array.array, (a.typecode, a.tobytes())
49reduction.register(array.array, reduce_array)
50
51view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
52def rebuild_as_list(obj):
53    return list, (list(obj),)
54for view_type in view_types:
55    reduction.register(view_type, rebuild_as_list)
56del view_type, view_types
57
58#
59# Type for identifying shared objects
60#
61
62class Token(object):
63    '''
64    Type to uniquely identify a shared object
65    '''
66    __slots__ = ('typeid', 'address', 'id')
67
68    def __init__(self, typeid, address, id):
69        (self.typeid, self.address, self.id) = (typeid, address, id)
70
71    def __getstate__(self):
72        return (self.typeid, self.address, self.id)
73
74    def __setstate__(self, state):
75        (self.typeid, self.address, self.id) = state
76
77    def __repr__(self):
78        return '%s(typeid=%r, address=%r, id=%r)' % \
79               (self.__class__.__name__, self.typeid, self.address, self.id)
80
81#
82# Function for communication with a manager's server process
83#
84
85def dispatch(c, id, methodname, args=(), kwds={}):
86    '''
87    Send a message to manager using connection `c` and return response
88    '''
89    c.send((id, methodname, args, kwds))
90    kind, result = c.recv()
91    if kind == '#RETURN':
92        return result
93    try:
94        raise convert_to_error(kind, result)
95    finally:
96        del result  # break reference cycle
97
98def convert_to_error(kind, result):
99    if kind == '#ERROR':
100        return result
101    elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
102        if not isinstance(result, str):
103            raise TypeError(
104                "Result {0!r} (kind '{1}') type is {2}, not str".format(
105                    result, kind, type(result)))
106        if kind == '#UNSERIALIZABLE':
107            return RemoteError('Unserializable message: %s\n' % result)
108        else:
109            return RemoteError(result)
110    else:
111        return ValueError('Unrecognized message type {!r}'.format(kind))
112
113class RemoteError(Exception):
114    def __str__(self):
115        return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
116
117#
118# Functions for finding the method names of an object
119#
120
121def all_methods(obj):
122    '''
123    Return a list of names of methods of `obj`
124    '''
125    temp = []
126    for name in dir(obj):
127        func = getattr(obj, name)
128        if callable(func):
129            temp.append(name)
130    return temp
131
132def public_methods(obj):
133    '''
134    Return a list of names of methods of `obj` which do not start with '_'
135    '''
136    return [name for name in all_methods(obj) if name[0] != '_']
137
138#
139# Server which is run in a process controlled by a manager
140#
141
142class Server(object):
143    '''
144    Server class which runs in a process controlled by a manager object
145    '''
146    public = ['shutdown', 'create', 'accept_connection', 'get_methods',
147              'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
148
149    def __init__(self, registry, address, authkey, serializer):
150        if not isinstance(authkey, bytes):
151            raise TypeError(
152                "Authkey {0!r} is type {1!s}, not bytes".format(
153                    authkey, type(authkey)))
154        self.registry = registry
155        self.authkey = process.AuthenticationString(authkey)
156        Listener, Client = listener_client[serializer]
157
158        # do authentication later
159        self.listener = Listener(address=address, backlog=128)
160        self.address = self.listener.address
161
162        self.id_to_obj = {'0': (None, ())}
163        self.id_to_refcount = {}
164        self.id_to_local_proxy_obj = {}
165        self.mutex = threading.Lock()
166
167    def serve_forever(self):
168        '''
169        Run the server forever
170        '''
171        self.stop_event = threading.Event()
172        process.current_process()._manager_server = self
173        try:
174            accepter = threading.Thread(target=self.accepter)
175            accepter.daemon = True
176            accepter.start()
177            try:
178                while not self.stop_event.is_set():
179                    self.stop_event.wait(1)
180            except (KeyboardInterrupt, SystemExit):
181                pass
182        finally:
183            if sys.stdout != sys.__stdout__: # what about stderr?
184                util.debug('resetting stdout, stderr')
185                sys.stdout = sys.__stdout__
186                sys.stderr = sys.__stderr__
187            sys.exit(0)
188
189    def accepter(self):
190        while True:
191            try:
192                c = self.listener.accept()
193            except OSError:
194                continue
195            t = threading.Thread(target=self.handle_request, args=(c,))
196            t.daemon = True
197            t.start()
198
199    def _handle_request(self, c):
200        request = None
201        try:
202            connection.deliver_challenge(c, self.authkey)
203            connection.answer_challenge(c, self.authkey)
204            request = c.recv()
205            ignore, funcname, args, kwds = request
206            assert funcname in self.public, '%r unrecognized' % funcname
207            func = getattr(self, funcname)
208        except Exception:
209            msg = ('#TRACEBACK', format_exc())
210        else:
211            try:
212                result = func(c, *args, **kwds)
213            except Exception:
214                msg = ('#TRACEBACK', format_exc())
215            else:
216                msg = ('#RETURN', result)
217
218        try:
219            c.send(msg)
220        except Exception as e:
221            try:
222                c.send(('#TRACEBACK', format_exc()))
223            except Exception:
224                pass
225            util.info('Failure to send message: %r', msg)
226            util.info(' ... request was %r', request)
227            util.info(' ... exception was %r', e)
228
229    def handle_request(self, conn):
230        '''
231        Handle a new connection
232        '''
233        try:
234            self._handle_request(conn)
235        except SystemExit:
236            # Server.serve_client() calls sys.exit(0) on EOF
237            pass
238        finally:
239            conn.close()
240
241    def serve_client(self, conn):
242        '''
243        Handle requests from the proxies in a particular process/thread
244        '''
245        util.debug('starting server thread to service %r',
246                   threading.current_thread().name)
247
248        recv = conn.recv
249        send = conn.send
250        id_to_obj = self.id_to_obj
251
252        while not self.stop_event.is_set():
253
254            try:
255                methodname = obj = None
256                request = recv()
257                ident, methodname, args, kwds = request
258                try:
259                    obj, exposed, gettypeid = id_to_obj[ident]
260                except KeyError as ke:
261                    try:
262                        obj, exposed, gettypeid = \
263                            self.id_to_local_proxy_obj[ident]
264                    except KeyError:
265                        raise ke
266
267                if methodname not in exposed:
268                    raise AttributeError(
269                        'method %r of %r object is not in exposed=%r' %
270                        (methodname, type(obj), exposed)
271                        )
272
273                function = getattr(obj, methodname)
274
275                try:
276                    res = function(*args, **kwds)
277                except Exception as e:
278                    msg = ('#ERROR', e)
279                else:
280                    typeid = gettypeid and gettypeid.get(methodname, None)
281                    if typeid:
282                        rident, rexposed = self.create(conn, typeid, res)
283                        token = Token(typeid, self.address, rident)
284                        msg = ('#PROXY', (rexposed, token))
285                    else:
286                        msg = ('#RETURN', res)
287
288            except AttributeError:
289                if methodname is None:
290                    msg = ('#TRACEBACK', format_exc())
291                else:
292                    try:
293                        fallback_func = self.fallback_mapping[methodname]
294                        result = fallback_func(
295                            self, conn, ident, obj, *args, **kwds
296                            )
297                        msg = ('#RETURN', result)
298                    except Exception:
299                        msg = ('#TRACEBACK', format_exc())
300
301            except EOFError:
302                util.debug('got EOF -- exiting thread serving %r',
303                           threading.current_thread().name)
304                sys.exit(0)
305
306            except Exception:
307                msg = ('#TRACEBACK', format_exc())
308
309            try:
310                try:
311                    send(msg)
312                except Exception:
313                    send(('#UNSERIALIZABLE', format_exc()))
314            except Exception as e:
315                util.info('exception in thread serving %r',
316                        threading.current_thread().name)
317                util.info(' ... message was %r', msg)
318                util.info(' ... exception was %r', e)
319                conn.close()
320                sys.exit(1)
321
322    def fallback_getvalue(self, conn, ident, obj):
323        return obj
324
325    def fallback_str(self, conn, ident, obj):
326        return str(obj)
327
328    def fallback_repr(self, conn, ident, obj):
329        return repr(obj)
330
331    fallback_mapping = {
332        '__str__':fallback_str,
333        '__repr__':fallback_repr,
334        '#GETVALUE':fallback_getvalue
335        }
336
337    def dummy(self, c):
338        pass
339
340    def debug_info(self, c):
341        '''
342        Return some info --- useful to spot problems with refcounting
343        '''
344        # Perhaps include debug info about 'c'?
345        with self.mutex:
346            result = []
347            keys = list(self.id_to_refcount.keys())
348            keys.sort()
349            for ident in keys:
350                if ident != '0':
351                    result.append('  %s:       refcount=%s\n    %s' %
352                                  (ident, self.id_to_refcount[ident],
353                                   str(self.id_to_obj[ident][0])[:75]))
354            return '\n'.join(result)
355
356    def number_of_objects(self, c):
357        '''
358        Number of shared objects
359        '''
360        # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
361        return len(self.id_to_refcount)
362
363    def shutdown(self, c):
364        '''
365        Shutdown this process
366        '''
367        try:
368            util.debug('manager received shutdown message')
369            c.send(('#RETURN', None))
370        except:
371            import traceback
372            traceback.print_exc()
373        finally:
374            self.stop_event.set()
375
376    def create(self, c, typeid, /, *args, **kwds):
377        '''
378        Create a new shared object and return its id
379        '''
380        with self.mutex:
381            callable, exposed, method_to_typeid, proxytype = \
382                      self.registry[typeid]
383
384            if callable is None:
385                if kwds or (len(args) != 1):
386                    raise ValueError(
387                        "Without callable, must have one non-keyword argument")
388                obj = args[0]
389            else:
390                obj = callable(*args, **kwds)
391
392            if exposed is None:
393                exposed = public_methods(obj)
394            if method_to_typeid is not None:
395                if not isinstance(method_to_typeid, dict):
396                    raise TypeError(
397                        "Method_to_typeid {0!r}: type {1!s}, not dict".format(
398                            method_to_typeid, type(method_to_typeid)))
399                exposed = list(exposed) + list(method_to_typeid)
400
401            ident = '%x' % id(obj)  # convert to string because xmlrpclib
402                                    # only has 32 bit signed integers
403            util.debug('%r callable returned object with id %r', typeid, ident)
404
405            self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
406            if ident not in self.id_to_refcount:
407                self.id_to_refcount[ident] = 0
408
409        self.incref(c, ident)
410        return ident, tuple(exposed)
411
412    def get_methods(self, c, token):
413        '''
414        Return the methods of the shared object indicated by token
415        '''
416        return tuple(self.id_to_obj[token.id][1])
417
418    def accept_connection(self, c, name):
419        '''
420        Spawn a new thread to serve this connection
421        '''
422        threading.current_thread().name = name
423        c.send(('#RETURN', None))
424        self.serve_client(c)
425
426    def incref(self, c, ident):
427        with self.mutex:
428            try:
429                self.id_to_refcount[ident] += 1
430            except KeyError as ke:
431                # If no external references exist but an internal (to the
432                # manager) still does and a new external reference is created
433                # from it, restore the manager's tracking of it from the
434                # previously stashed internal ref.
435                if ident in self.id_to_local_proxy_obj:
436                    self.id_to_refcount[ident] = 1
437                    self.id_to_obj[ident] = \
438                        self.id_to_local_proxy_obj[ident]
439                    util.debug('Server re-enabled tracking & INCREF %r', ident)
440                else:
441                    raise ke
442
443    def decref(self, c, ident):
444        if ident not in self.id_to_refcount and \
445            ident in self.id_to_local_proxy_obj:
446            util.debug('Server DECREF skipping %r', ident)
447            return
448
449        with self.mutex:
450            if self.id_to_refcount[ident] <= 0:
451                raise AssertionError(
452                    "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
453                        ident, self.id_to_obj[ident],
454                        self.id_to_refcount[ident]))
455            self.id_to_refcount[ident] -= 1
456            if self.id_to_refcount[ident] == 0:
457                del self.id_to_refcount[ident]
458
459        if ident not in self.id_to_refcount:
460            # Two-step process in case the object turns out to contain other
461            # proxy objects (e.g. a managed list of managed lists).
462            # Otherwise, deleting self.id_to_obj[ident] would trigger the
463            # deleting of the stored value (another managed object) which would
464            # in turn attempt to acquire the mutex that is already held here.
465            self.id_to_obj[ident] = (None, (), None)  # thread-safe
466            util.debug('disposing of obj with id %r', ident)
467            with self.mutex:
468                del self.id_to_obj[ident]
469
470
471#
472# Class to represent state of a manager
473#
474
475class State(object):
476    __slots__ = ['value']
477    INITIAL = 0
478    STARTED = 1
479    SHUTDOWN = 2
480
481#
482# Mapping from serializer name to Listener and Client types
483#
484
485listener_client = {
486    'pickle' : (connection.Listener, connection.Client),
487    'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
488    }
489
490#
491# Definition of BaseManager
492#
493
494class BaseManager(object):
495    '''
496    Base class for managers
497    '''
498    _registry = {}
499    _Server = Server
500
501    def __init__(self, address=None, authkey=None, serializer='pickle',
502                 ctx=None, *, shutdown_timeout=1.0):
503        if authkey is None:
504            authkey = process.current_process().authkey
505        self._address = address     # XXX not final address if eg ('', 0)
506        self._authkey = process.AuthenticationString(authkey)
507        self._state = State()
508        self._state.value = State.INITIAL
509        self._serializer = serializer
510        self._Listener, self._Client = listener_client[serializer]
511        self._ctx = ctx or get_context()
512        self._shutdown_timeout = shutdown_timeout
513
514    def get_server(self):
515        '''
516        Return server object with serve_forever() method and address attribute
517        '''
518        if self._state.value != State.INITIAL:
519            if self._state.value == State.STARTED:
520                raise ProcessError("Already started server")
521            elif self._state.value == State.SHUTDOWN:
522                raise ProcessError("Manager has shut down")
523            else:
524                raise ProcessError(
525                    "Unknown state {!r}".format(self._state.value))
526        return Server(self._registry, self._address,
527                      self._authkey, self._serializer)
528
529    def connect(self):
530        '''
531        Connect manager object to the server process
532        '''
533        Listener, Client = listener_client[self._serializer]
534        conn = Client(self._address, authkey=self._authkey)
535        dispatch(conn, None, 'dummy')
536        self._state.value = State.STARTED
537
538    def start(self, initializer=None, initargs=()):
539        '''
540        Spawn a server process for this manager object
541        '''
542        if self._state.value != State.INITIAL:
543            if self._state.value == State.STARTED:
544                raise ProcessError("Already started server")
545            elif self._state.value == State.SHUTDOWN:
546                raise ProcessError("Manager has shut down")
547            else:
548                raise ProcessError(
549                    "Unknown state {!r}".format(self._state.value))
550
551        if initializer is not None and not callable(initializer):
552            raise TypeError('initializer must be a callable')
553
554        # pipe over which we will retrieve address of server
555        reader, writer = connection.Pipe(duplex=False)
556
557        # spawn process which runs a server
558        self._process = self._ctx.Process(
559            target=type(self)._run_server,
560            args=(self._registry, self._address, self._authkey,
561                  self._serializer, writer, initializer, initargs),
562            )
563        ident = ':'.join(str(i) for i in self._process._identity)
564        self._process.name = type(self).__name__  + '-' + ident
565        self._process.start()
566
567        # get address of server
568        writer.close()
569        self._address = reader.recv()
570        reader.close()
571
572        # register a finalizer
573        self._state.value = State.STARTED
574        self.shutdown = util.Finalize(
575            self, type(self)._finalize_manager,
576            args=(self._process, self._address, self._authkey, self._state,
577                  self._Client, self._shutdown_timeout),
578            exitpriority=0
579            )
580
581    @classmethod
582    def _run_server(cls, registry, address, authkey, serializer, writer,
583                    initializer=None, initargs=()):
584        '''
585        Create a server, report its address and run it
586        '''
587        # bpo-36368: protect server process from KeyboardInterrupt signals
588        signal.signal(signal.SIGINT, signal.SIG_IGN)
589
590        if initializer is not None:
591            initializer(*initargs)
592
593        # create server
594        server = cls._Server(registry, address, authkey, serializer)
595
596        # inform parent process of the server's address
597        writer.send(server.address)
598        writer.close()
599
600        # run the manager
601        util.info('manager serving at %r', server.address)
602        server.serve_forever()
603
604    def _create(self, typeid, /, *args, **kwds):
605        '''
606        Create a new shared object; return the token and exposed tuple
607        '''
608        assert self._state.value == State.STARTED, 'server not yet started'
609        conn = self._Client(self._address, authkey=self._authkey)
610        try:
611            id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
612        finally:
613            conn.close()
614        return Token(typeid, self._address, id), exposed
615
616    def join(self, timeout=None):
617        '''
618        Join the manager process (if it has been spawned)
619        '''
620        if self._process is not None:
621            self._process.join(timeout)
622            if not self._process.is_alive():
623                self._process = None
624
625    def _debug_info(self):
626        '''
627        Return some info about the servers shared objects and connections
628        '''
629        conn = self._Client(self._address, authkey=self._authkey)
630        try:
631            return dispatch(conn, None, 'debug_info')
632        finally:
633            conn.close()
634
635    def _number_of_objects(self):
636        '''
637        Return the number of shared objects
638        '''
639        conn = self._Client(self._address, authkey=self._authkey)
640        try:
641            return dispatch(conn, None, 'number_of_objects')
642        finally:
643            conn.close()
644
645    def __enter__(self):
646        if self._state.value == State.INITIAL:
647            self.start()
648        if self._state.value != State.STARTED:
649            if self._state.value == State.INITIAL:
650                raise ProcessError("Unable to start server")
651            elif self._state.value == State.SHUTDOWN:
652                raise ProcessError("Manager has shut down")
653            else:
654                raise ProcessError(
655                    "Unknown state {!r}".format(self._state.value))
656        return self
657
658    def __exit__(self, exc_type, exc_val, exc_tb):
659        self.shutdown()
660
661    @staticmethod
662    def _finalize_manager(process, address, authkey, state, _Client,
663                          shutdown_timeout):
664        '''
665        Shutdown the manager process; will be registered as a finalizer
666        '''
667        if process.is_alive():
668            util.info('sending shutdown message to manager')
669            try:
670                conn = _Client(address, authkey=authkey)
671                try:
672                    dispatch(conn, None, 'shutdown')
673                finally:
674                    conn.close()
675            except Exception:
676                pass
677
678            process.join(timeout=shutdown_timeout)
679            if process.is_alive():
680                util.info('manager still alive')
681                if hasattr(process, 'terminate'):
682                    util.info('trying to `terminate()` manager process')
683                    process.terminate()
684                    process.join(timeout=shutdown_timeout)
685                    if process.is_alive():
686                        util.info('manager still alive after terminate')
687                        process.kill()
688                        process.join()
689
690        state.value = State.SHUTDOWN
691        try:
692            del BaseProxy._address_to_local[address]
693        except KeyError:
694            pass
695
696    @property
697    def address(self):
698        return self._address
699
700    @classmethod
701    def register(cls, typeid, callable=None, proxytype=None, exposed=None,
702                 method_to_typeid=None, create_method=True):
703        '''
704        Register a typeid with the manager type
705        '''
706        if '_registry' not in cls.__dict__:
707            cls._registry = cls._registry.copy()
708
709        if proxytype is None:
710            proxytype = AutoProxy
711
712        exposed = exposed or getattr(proxytype, '_exposed_', None)
713
714        method_to_typeid = method_to_typeid or \
715                           getattr(proxytype, '_method_to_typeid_', None)
716
717        if method_to_typeid:
718            for key, value in list(method_to_typeid.items()): # isinstance?
719                assert type(key) is str, '%r is not a string' % key
720                assert type(value) is str, '%r is not a string' % value
721
722        cls._registry[typeid] = (
723            callable, exposed, method_to_typeid, proxytype
724            )
725
726        if create_method:
727            def temp(self, /, *args, **kwds):
728                util.debug('requesting creation of a shared %r object', typeid)
729                token, exp = self._create(typeid, *args, **kwds)
730                proxy = proxytype(
731                    token, self._serializer, manager=self,
732                    authkey=self._authkey, exposed=exp
733                    )
734                conn = self._Client(token.address, authkey=self._authkey)
735                dispatch(conn, None, 'decref', (token.id,))
736                return proxy
737            temp.__name__ = typeid
738            setattr(cls, typeid, temp)
739
740#
741# Subclass of set which get cleared after a fork
742#
743
744class ProcessLocalSet(set):
745    def __init__(self):
746        util.register_after_fork(self, lambda obj: obj.clear())
747    def __reduce__(self):
748        return type(self), ()
749
750#
751# Definition of BaseProxy
752#
753
754class BaseProxy(object):
755    '''
756    A base for proxies of shared objects
757    '''
758    _address_to_local = {}
759    _mutex = util.ForkAwareThreadLock()
760
761    # Each instance gets a `_serial` number. Unlike `id(...)`, this number
762    # is never reused.
763    _next_serial = 1
764
765    def __init__(self, token, serializer, manager=None,
766                 authkey=None, exposed=None, incref=True, manager_owned=False):
767        with BaseProxy._mutex:
768            tls_serials = BaseProxy._address_to_local.get(token.address, None)
769            if tls_serials is None:
770                tls_serials = util.ForkAwareLocal(), ProcessLocalSet()
771                BaseProxy._address_to_local[token.address] = tls_serials
772
773            self._serial = BaseProxy._next_serial
774            BaseProxy._next_serial += 1
775
776        # self._tls is used to record the connection used by this
777        # thread to communicate with the manager at token.address
778        self._tls = tls_serials[0]
779
780        # self._all_serials is a set used to record the identities of all
781        # shared objects for which the current process owns references and
782        # which are in the manager at token.address
783        self._all_serials = tls_serials[1]
784
785        self._token = token
786        self._id = self._token.id
787        self._manager = manager
788        self._serializer = serializer
789        self._Client = listener_client[serializer][1]
790
791        # Should be set to True only when a proxy object is being created
792        # on the manager server; primary use case: nested proxy objects.
793        # RebuildProxy detects when a proxy is being created on the manager
794        # and sets this value appropriately.
795        self._owned_by_manager = manager_owned
796
797        if authkey is not None:
798            self._authkey = process.AuthenticationString(authkey)
799        elif self._manager is not None:
800            self._authkey = self._manager._authkey
801        else:
802            self._authkey = process.current_process().authkey
803
804        if incref:
805            self._incref()
806
807        util.register_after_fork(self, BaseProxy._after_fork)
808
809    def _connect(self):
810        util.debug('making connection to manager')
811        name = process.current_process().name
812        if threading.current_thread().name != 'MainThread':
813            name += '|' + threading.current_thread().name
814        conn = self._Client(self._token.address, authkey=self._authkey)
815        dispatch(conn, None, 'accept_connection', (name,))
816        self._tls.connection = conn
817
818    def _callmethod(self, methodname, args=(), kwds={}):
819        '''
820        Try to call a method of the referent and return a copy of the result
821        '''
822        try:
823            conn = self._tls.connection
824        except AttributeError:
825            util.debug('thread %r does not own a connection',
826                       threading.current_thread().name)
827            self._connect()
828            conn = self._tls.connection
829
830        conn.send((self._id, methodname, args, kwds))
831        kind, result = conn.recv()
832
833        if kind == '#RETURN':
834            return result
835        elif kind == '#PROXY':
836            exposed, token = result
837            proxytype = self._manager._registry[token.typeid][-1]
838            token.address = self._token.address
839            proxy = proxytype(
840                token, self._serializer, manager=self._manager,
841                authkey=self._authkey, exposed=exposed
842                )
843            conn = self._Client(token.address, authkey=self._authkey)
844            dispatch(conn, None, 'decref', (token.id,))
845            return proxy
846        try:
847            raise convert_to_error(kind, result)
848        finally:
849            del result   # break reference cycle
850
851    def _getvalue(self):
852        '''
853        Get a copy of the value of the referent
854        '''
855        return self._callmethod('#GETVALUE')
856
857    def _incref(self):
858        if self._owned_by_manager:
859            util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
860            return
861
862        conn = self._Client(self._token.address, authkey=self._authkey)
863        dispatch(conn, None, 'incref', (self._id,))
864        util.debug('INCREF %r', self._token.id)
865
866        self._all_serials.add(self._serial)
867
868        state = self._manager and self._manager._state
869
870        self._close = util.Finalize(
871            self, BaseProxy._decref,
872            args=(self._token, self._serial, self._authkey, state,
873                  self._tls, self._all_serials, self._Client),
874            exitpriority=10
875            )
876
877    @staticmethod
878    def _decref(token, serial, authkey, state, tls, idset, _Client):
879        idset.discard(serial)
880
881        # check whether manager is still alive
882        if state is None or state.value == State.STARTED:
883            # tell manager this process no longer cares about referent
884            try:
885                util.debug('DECREF %r', token.id)
886                conn = _Client(token.address, authkey=authkey)
887                dispatch(conn, None, 'decref', (token.id,))
888            except Exception as e:
889                util.debug('... decref failed %s', e)
890
891        else:
892            util.debug('DECREF %r -- manager already shutdown', token.id)
893
894        # check whether we can close this thread's connection because
895        # the process owns no more references to objects for this manager
896        if not idset and hasattr(tls, 'connection'):
897            util.debug('thread %r has no more proxies so closing conn',
898                       threading.current_thread().name)
899            tls.connection.close()
900            del tls.connection
901
902    def _after_fork(self):
903        self._manager = None
904        try:
905            self._incref()
906        except Exception as e:
907            # the proxy may just be for a manager which has shutdown
908            util.info('incref failed: %s' % e)
909
910    def __reduce__(self):
911        kwds = {}
912        if get_spawning_popen() is not None:
913            kwds['authkey'] = self._authkey
914
915        if getattr(self, '_isauto', False):
916            kwds['exposed'] = self._exposed_
917            return (RebuildProxy,
918                    (AutoProxy, self._token, self._serializer, kwds))
919        else:
920            return (RebuildProxy,
921                    (type(self), self._token, self._serializer, kwds))
922
923    def __deepcopy__(self, memo):
924        return self._getvalue()
925
926    def __repr__(self):
927        return '<%s object, typeid %r at %#x>' % \
928               (type(self).__name__, self._token.typeid, id(self))
929
930    def __str__(self):
931        '''
932        Return representation of the referent (or a fall-back if that fails)
933        '''
934        try:
935            return self._callmethod('__repr__')
936        except Exception:
937            return repr(self)[:-1] + "; '__str__()' failed>"
938
939#
940# Function used for unpickling
941#
942
943def RebuildProxy(func, token, serializer, kwds):
944    '''
945    Function used for unpickling proxy objects.
946    '''
947    server = getattr(process.current_process(), '_manager_server', None)
948    if server and server.address == token.address:
949        util.debug('Rebuild a proxy owned by manager, token=%r', token)
950        kwds['manager_owned'] = True
951        if token.id not in server.id_to_local_proxy_obj:
952            server.id_to_local_proxy_obj[token.id] = \
953                server.id_to_obj[token.id]
954    incref = (
955        kwds.pop('incref', True) and
956        not getattr(process.current_process(), '_inheriting', False)
957        )
958    return func(token, serializer, incref=incref, **kwds)
959
960#
961# Functions to create proxies and proxy types
962#
963
964def MakeProxyType(name, exposed, _cache={}):
965    '''
966    Return a proxy type whose methods are given by `exposed`
967    '''
968    exposed = tuple(exposed)
969    try:
970        return _cache[(name, exposed)]
971    except KeyError:
972        pass
973
974    dic = {}
975
976    for meth in exposed:
977        exec('''def %s(self, /, *args, **kwds):
978        return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
979
980    ProxyType = type(name, (BaseProxy,), dic)
981    ProxyType._exposed_ = exposed
982    _cache[(name, exposed)] = ProxyType
983    return ProxyType
984
985
986def AutoProxy(token, serializer, manager=None, authkey=None,
987              exposed=None, incref=True, manager_owned=False):
988    '''
989    Return an auto-proxy for `token`
990    '''
991    _Client = listener_client[serializer][1]
992
993    if exposed is None:
994        conn = _Client(token.address, authkey=authkey)
995        try:
996            exposed = dispatch(conn, None, 'get_methods', (token,))
997        finally:
998            conn.close()
999
1000    if authkey is None and manager is not None:
1001        authkey = manager._authkey
1002    if authkey is None:
1003        authkey = process.current_process().authkey
1004
1005    ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
1006    proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
1007                      incref=incref, manager_owned=manager_owned)
1008    proxy._isauto = True
1009    return proxy
1010
1011#
1012# Types/callables which we will register with SyncManager
1013#
1014
1015class Namespace(object):
1016    def __init__(self, /, **kwds):
1017        self.__dict__.update(kwds)
1018    def __repr__(self):
1019        items = list(self.__dict__.items())
1020        temp = []
1021        for name, value in items:
1022            if not name.startswith('_'):
1023                temp.append('%s=%r' % (name, value))
1024        temp.sort()
1025        return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
1026
1027class Value(object):
1028    def __init__(self, typecode, value, lock=True):
1029        self._typecode = typecode
1030        self._value = value
1031    def get(self):
1032        return self._value
1033    def set(self, value):
1034        self._value = value
1035    def __repr__(self):
1036        return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1037    value = property(get, set)
1038
1039def Array(typecode, sequence, lock=True):
1040    return array.array(typecode, sequence)
1041
1042#
1043# Proxy types used by SyncManager
1044#
1045
1046class IteratorProxy(BaseProxy):
1047    _exposed_ = ('__next__', 'send', 'throw', 'close')
1048    def __iter__(self):
1049        return self
1050    def __next__(self, *args):
1051        return self._callmethod('__next__', args)
1052    def send(self, *args):
1053        return self._callmethod('send', args)
1054    def throw(self, *args):
1055        return self._callmethod('throw', args)
1056    def close(self, *args):
1057        return self._callmethod('close', args)
1058
1059
1060class AcquirerProxy(BaseProxy):
1061    _exposed_ = ('acquire', 'release')
1062    def acquire(self, blocking=True, timeout=None):
1063        args = (blocking,) if timeout is None else (blocking, timeout)
1064        return self._callmethod('acquire', args)
1065    def release(self):
1066        return self._callmethod('release')
1067    def __enter__(self):
1068        return self._callmethod('acquire')
1069    def __exit__(self, exc_type, exc_val, exc_tb):
1070        return self._callmethod('release')
1071
1072
1073class ConditionProxy(AcquirerProxy):
1074    _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
1075    def wait(self, timeout=None):
1076        return self._callmethod('wait', (timeout,))
1077    def notify(self, n=1):
1078        return self._callmethod('notify', (n,))
1079    def notify_all(self):
1080        return self._callmethod('notify_all')
1081    def wait_for(self, predicate, timeout=None):
1082        result = predicate()
1083        if result:
1084            return result
1085        if timeout is not None:
1086            endtime = time.monotonic() + timeout
1087        else:
1088            endtime = None
1089            waittime = None
1090        while not result:
1091            if endtime is not None:
1092                waittime = endtime - time.monotonic()
1093                if waittime <= 0:
1094                    break
1095            self.wait(waittime)
1096            result = predicate()
1097        return result
1098
1099
1100class EventProxy(BaseProxy):
1101    _exposed_ = ('is_set', 'set', 'clear', 'wait')
1102    def is_set(self):
1103        return self._callmethod('is_set')
1104    def set(self):
1105        return self._callmethod('set')
1106    def clear(self):
1107        return self._callmethod('clear')
1108    def wait(self, timeout=None):
1109        return self._callmethod('wait', (timeout,))
1110
1111
1112class BarrierProxy(BaseProxy):
1113    _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1114    def wait(self, timeout=None):
1115        return self._callmethod('wait', (timeout,))
1116    def abort(self):
1117        return self._callmethod('abort')
1118    def reset(self):
1119        return self._callmethod('reset')
1120    @property
1121    def parties(self):
1122        return self._callmethod('__getattribute__', ('parties',))
1123    @property
1124    def n_waiting(self):
1125        return self._callmethod('__getattribute__', ('n_waiting',))
1126    @property
1127    def broken(self):
1128        return self._callmethod('__getattribute__', ('broken',))
1129
1130
1131class NamespaceProxy(BaseProxy):
1132    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1133    def __getattr__(self, key):
1134        if key[0] == '_':
1135            return object.__getattribute__(self, key)
1136        callmethod = object.__getattribute__(self, '_callmethod')
1137        return callmethod('__getattribute__', (key,))
1138    def __setattr__(self, key, value):
1139        if key[0] == '_':
1140            return object.__setattr__(self, key, value)
1141        callmethod = object.__getattribute__(self, '_callmethod')
1142        return callmethod('__setattr__', (key, value))
1143    def __delattr__(self, key):
1144        if key[0] == '_':
1145            return object.__delattr__(self, key)
1146        callmethod = object.__getattribute__(self, '_callmethod')
1147        return callmethod('__delattr__', (key,))
1148
1149
1150class ValueProxy(BaseProxy):
1151    _exposed_ = ('get', 'set')
1152    def get(self):
1153        return self._callmethod('get')
1154    def set(self, value):
1155        return self._callmethod('set', (value,))
1156    value = property(get, set)
1157
1158    __class_getitem__ = classmethod(types.GenericAlias)
1159
1160
1161BaseListProxy = MakeProxyType('BaseListProxy', (
1162    '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1163    '__mul__', '__reversed__', '__rmul__', '__setitem__',
1164    'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1165    'reverse', 'sort', '__imul__'
1166    ))
1167class ListProxy(BaseListProxy):
1168    def __iadd__(self, value):
1169        self._callmethod('extend', (value,))
1170        return self
1171    def __imul__(self, value):
1172        self._callmethod('__imul__', (value,))
1173        return self
1174
1175    __class_getitem__ = classmethod(types.GenericAlias)
1176
1177
1178_BaseDictProxy = MakeProxyType('DictProxy', (
1179    '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
1180    '__setitem__', 'clear', 'copy', 'get', 'items',
1181    'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1182    ))
1183_BaseDictProxy._method_to_typeid_ = {
1184    '__iter__': 'Iterator',
1185    }
1186class DictProxy(_BaseDictProxy):
1187    __class_getitem__ = classmethod(types.GenericAlias)
1188
1189
1190ArrayProxy = MakeProxyType('ArrayProxy', (
1191    '__len__', '__getitem__', '__setitem__'
1192    ))
1193
1194
1195BasePoolProxy = MakeProxyType('PoolProxy', (
1196    'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1197    'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
1198    ))
1199BasePoolProxy._method_to_typeid_ = {
1200    'apply_async': 'AsyncResult',
1201    'map_async': 'AsyncResult',
1202    'starmap_async': 'AsyncResult',
1203    'imap': 'Iterator',
1204    'imap_unordered': 'Iterator'
1205    }
1206class PoolProxy(BasePoolProxy):
1207    def __enter__(self):
1208        return self
1209    def __exit__(self, exc_type, exc_val, exc_tb):
1210        self.terminate()
1211
1212#
1213# Definition of SyncManager
1214#
1215
1216class SyncManager(BaseManager):
1217    '''
1218    Subclass of `BaseManager` which supports a number of shared object types.
1219
1220    The types registered are those intended for the synchronization
1221    of threads, plus `dict`, `list` and `Namespace`.
1222
1223    The `multiprocessing.Manager()` function creates started instances of
1224    this class.
1225    '''
1226
1227SyncManager.register('Queue', queue.Queue)
1228SyncManager.register('JoinableQueue', queue.Queue)
1229SyncManager.register('Event', threading.Event, EventProxy)
1230SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1231SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1232SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1233SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1234                     AcquirerProxy)
1235SyncManager.register('Condition', threading.Condition, ConditionProxy)
1236SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
1237SyncManager.register('Pool', pool.Pool, PoolProxy)
1238SyncManager.register('list', list, ListProxy)
1239SyncManager.register('dict', dict, DictProxy)
1240SyncManager.register('Value', Value, ValueProxy)
1241SyncManager.register('Array', Array, ArrayProxy)
1242SyncManager.register('Namespace', Namespace, NamespaceProxy)
1243
1244# types returned by methods of PoolProxy
1245SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1246SyncManager.register('AsyncResult', create_method=False)
1247
1248#
1249# Definition of SharedMemoryManager and SharedMemoryServer
1250#
1251
1252if HAS_SHMEM:
1253    class _SharedMemoryTracker:
1254        "Manages one or more shared memory segments."
1255
1256        def __init__(self, name, segment_names=[]):
1257            self.shared_memory_context_name = name
1258            self.segment_names = segment_names
1259
1260        def register_segment(self, segment_name):
1261            "Adds the supplied shared memory block name to tracker."
1262            util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1263            self.segment_names.append(segment_name)
1264
1265        def destroy_segment(self, segment_name):
1266            """Calls unlink() on the shared memory block with the supplied name
1267            and removes it from the list of blocks being tracked."""
1268            util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1269            self.segment_names.remove(segment_name)
1270            segment = shared_memory.SharedMemory(segment_name)
1271            segment.close()
1272            segment.unlink()
1273
1274        def unlink(self):
1275            "Calls destroy_segment() on all tracked shared memory blocks."
1276            for segment_name in self.segment_names[:]:
1277                self.destroy_segment(segment_name)
1278
1279        def __del__(self):
1280            util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1281            self.unlink()
1282
1283        def __getstate__(self):
1284            return (self.shared_memory_context_name, self.segment_names)
1285
1286        def __setstate__(self, state):
1287            self.__init__(*state)
1288
1289
1290    class SharedMemoryServer(Server):
1291
1292        public = Server.public + \
1293                 ['track_segment', 'release_segment', 'list_segments']
1294
1295        def __init__(self, *args, **kwargs):
1296            Server.__init__(self, *args, **kwargs)
1297            address = self.address
1298            # The address of Linux abstract namespaces can be bytes
1299            if isinstance(address, bytes):
1300                address = os.fsdecode(address)
1301            self.shared_memory_context = \
1302                _SharedMemoryTracker(f"shm_{address}_{getpid()}")
1303            util.debug(f"SharedMemoryServer started by pid {getpid()}")
1304
1305        def create(self, c, typeid, /, *args, **kwargs):
1306            """Create a new distributed-shared object (not backed by a shared
1307            memory block) and return its id to be used in a Proxy Object."""
1308            # Unless set up as a shared proxy, don't make shared_memory_context
1309            # a standard part of kwargs.  This makes things easier for supplying
1310            # simple functions.
1311            if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1312                kwargs['shared_memory_context'] = self.shared_memory_context
1313            return Server.create(self, c, typeid, *args, **kwargs)
1314
1315        def shutdown(self, c):
1316            "Call unlink() on all tracked shared memory, terminate the Server."
1317            self.shared_memory_context.unlink()
1318            return Server.shutdown(self, c)
1319
1320        def track_segment(self, c, segment_name):
1321            "Adds the supplied shared memory block name to Server's tracker."
1322            self.shared_memory_context.register_segment(segment_name)
1323
1324        def release_segment(self, c, segment_name):
1325            """Calls unlink() on the shared memory block with the supplied name
1326            and removes it from the tracker instance inside the Server."""
1327            self.shared_memory_context.destroy_segment(segment_name)
1328
1329        def list_segments(self, c):
1330            """Returns a list of names of shared memory blocks that the Server
1331            is currently tracking."""
1332            return self.shared_memory_context.segment_names
1333
1334
1335    class SharedMemoryManager(BaseManager):
1336        """Like SyncManager but uses SharedMemoryServer instead of Server.
1337
1338        It provides methods for creating and returning SharedMemory instances
1339        and for creating a list-like object (ShareableList) backed by shared
1340        memory.  It also provides methods that create and return Proxy Objects
1341        that support synchronization across processes (i.e. multi-process-safe
1342        locks and semaphores).
1343        """
1344
1345        _Server = SharedMemoryServer
1346
1347        def __init__(self, *args, **kwargs):
1348            if os.name == "posix":
1349                # bpo-36867: Ensure the resource_tracker is running before
1350                # launching the manager process, so that concurrent
1351                # shared_memory manipulation both in the manager and in the
1352                # current process does not create two resource_tracker
1353                # processes.
1354                from . import resource_tracker
1355                resource_tracker.ensure_running()
1356            BaseManager.__init__(self, *args, **kwargs)
1357            util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1358
1359        def __del__(self):
1360            util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1361
1362        def get_server(self):
1363            'Better than monkeypatching for now; merge into Server ultimately'
1364            if self._state.value != State.INITIAL:
1365                if self._state.value == State.STARTED:
1366                    raise ProcessError("Already started SharedMemoryServer")
1367                elif self._state.value == State.SHUTDOWN:
1368                    raise ProcessError("SharedMemoryManager has shut down")
1369                else:
1370                    raise ProcessError(
1371                        "Unknown state {!r}".format(self._state.value))
1372            return self._Server(self._registry, self._address,
1373                                self._authkey, self._serializer)
1374
1375        def SharedMemory(self, size):
1376            """Returns a new SharedMemory instance with the specified size in
1377            bytes, to be tracked by the manager."""
1378            with self._Client(self._address, authkey=self._authkey) as conn:
1379                sms = shared_memory.SharedMemory(None, create=True, size=size)
1380                try:
1381                    dispatch(conn, None, 'track_segment', (sms.name,))
1382                except BaseException as e:
1383                    sms.unlink()
1384                    raise e
1385            return sms
1386
1387        def ShareableList(self, sequence):
1388            """Returns a new ShareableList instance populated with the values
1389            from the input sequence, to be tracked by the manager."""
1390            with self._Client(self._address, authkey=self._authkey) as conn:
1391                sl = shared_memory.ShareableList(sequence)
1392                try:
1393                    dispatch(conn, None, 'track_segment', (sl.shm.name,))
1394                except BaseException as e:
1395                    sl.shm.unlink()
1396                    raise e
1397            return sl
1398