• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Module providing manager classes for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk
8# Licensed to PSF under a Contributor Agreement.
9#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token',
12            'SharedMemoryManager' ]
13
14#
15# Imports
16#
17
18import sys
19import threading
20import signal
21import array
22import queue
23import time
24import os
25from os import getpid
26
27from traceback import format_exc
28
29from . import connection
30from .context import reduction, get_spawning_popen, ProcessError
31from . import pool
32from . import process
33from . import util
34from . import get_context
35try:
36    from . import shared_memory
37    HAS_SHMEM = True
38except ImportError:
39    HAS_SHMEM = False
40
41#
42# Register some things for pickling
43#
44
45def reduce_array(a):
46    return array.array, (a.typecode, a.tobytes())
47reduction.register(array.array, reduce_array)
48
49view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
50if view_types[0] is not list:       # only needed in Py3.0
51    def rebuild_as_list(obj):
52        return list, (list(obj),)
53    for view_type in view_types:
54        reduction.register(view_type, rebuild_as_list)
55
56#
57# Type for identifying shared objects
58#
59
60class Token(object):
61    '''
62    Type to uniquely indentify a shared object
63    '''
64    __slots__ = ('typeid', 'address', 'id')
65
66    def __init__(self, typeid, address, id):
67        (self.typeid, self.address, self.id) = (typeid, address, id)
68
69    def __getstate__(self):
70        return (self.typeid, self.address, self.id)
71
72    def __setstate__(self, state):
73        (self.typeid, self.address, self.id) = state
74
75    def __repr__(self):
76        return '%s(typeid=%r, address=%r, id=%r)' % \
77               (self.__class__.__name__, self.typeid, self.address, self.id)
78
79#
80# Function for communication with a manager's server process
81#
82
83def dispatch(c, id, methodname, args=(), kwds={}):
84    '''
85    Send a message to manager using connection `c` and return response
86    '''
87    c.send((id, methodname, args, kwds))
88    kind, result = c.recv()
89    if kind == '#RETURN':
90        return result
91    raise convert_to_error(kind, result)
92
93def convert_to_error(kind, result):
94    if kind == '#ERROR':
95        return result
96    elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
97        if not isinstance(result, str):
98            raise TypeError(
99                "Result {0!r} (kind '{1}') type is {2}, not str".format(
100                    result, kind, type(result)))
101        if kind == '#UNSERIALIZABLE':
102            return RemoteError('Unserializable message: %s\n' % result)
103        else:
104            return RemoteError(result)
105    else:
106        return ValueError('Unrecognized message type {!r}'.format(kind))
107
108class RemoteError(Exception):
109    def __str__(self):
110        return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
111
112#
113# Functions for finding the method names of an object
114#
115
116def all_methods(obj):
117    '''
118    Return a list of names of methods of `obj`
119    '''
120    temp = []
121    for name in dir(obj):
122        func = getattr(obj, name)
123        if callable(func):
124            temp.append(name)
125    return temp
126
127def public_methods(obj):
128    '''
129    Return a list of names of methods of `obj` which do not start with '_'
130    '''
131    return [name for name in all_methods(obj) if name[0] != '_']
132
133#
134# Server which is run in a process controlled by a manager
135#
136
137class Server(object):
138    '''
139    Server class which runs in a process controlled by a manager object
140    '''
141    public = ['shutdown', 'create', 'accept_connection', 'get_methods',
142              'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
143
144    def __init__(self, registry, address, authkey, serializer):
145        if not isinstance(authkey, bytes):
146            raise TypeError(
147                "Authkey {0!r} is type {1!s}, not bytes".format(
148                    authkey, type(authkey)))
149        self.registry = registry
150        self.authkey = process.AuthenticationString(authkey)
151        Listener, Client = listener_client[serializer]
152
153        # do authentication later
154        self.listener = Listener(address=address, backlog=16)
155        self.address = self.listener.address
156
157        self.id_to_obj = {'0': (None, ())}
158        self.id_to_refcount = {}
159        self.id_to_local_proxy_obj = {}
160        self.mutex = threading.Lock()
161
162    def serve_forever(self):
163        '''
164        Run the server forever
165        '''
166        self.stop_event = threading.Event()
167        process.current_process()._manager_server = self
168        try:
169            accepter = threading.Thread(target=self.accepter)
170            accepter.daemon = True
171            accepter.start()
172            try:
173                while not self.stop_event.is_set():
174                    self.stop_event.wait(1)
175            except (KeyboardInterrupt, SystemExit):
176                pass
177        finally:
178            if sys.stdout != sys.__stdout__: # what about stderr?
179                util.debug('resetting stdout, stderr')
180                sys.stdout = sys.__stdout__
181                sys.stderr = sys.__stderr__
182            sys.exit(0)
183
184    def accepter(self):
185        while True:
186            try:
187                c = self.listener.accept()
188            except OSError:
189                continue
190            t = threading.Thread(target=self.handle_request, args=(c,))
191            t.daemon = True
192            t.start()
193
194    def handle_request(self, c):
195        '''
196        Handle a new connection
197        '''
198        funcname = result = request = None
199        try:
200            connection.deliver_challenge(c, self.authkey)
201            connection.answer_challenge(c, self.authkey)
202            request = c.recv()
203            ignore, funcname, args, kwds = request
204            assert funcname in self.public, '%r unrecognized' % funcname
205            func = getattr(self, funcname)
206        except Exception:
207            msg = ('#TRACEBACK', format_exc())
208        else:
209            try:
210                result = func(c, *args, **kwds)
211            except Exception:
212                msg = ('#TRACEBACK', format_exc())
213            else:
214                msg = ('#RETURN', result)
215        try:
216            c.send(msg)
217        except Exception as e:
218            try:
219                c.send(('#TRACEBACK', format_exc()))
220            except Exception:
221                pass
222            util.info('Failure to send message: %r', msg)
223            util.info(' ... request was %r', request)
224            util.info(' ... exception was %r', e)
225
226        c.close()
227
228    def serve_client(self, conn):
229        '''
230        Handle requests from the proxies in a particular process/thread
231        '''
232        util.debug('starting server thread to service %r',
233                   threading.current_thread().name)
234
235        recv = conn.recv
236        send = conn.send
237        id_to_obj = self.id_to_obj
238
239        while not self.stop_event.is_set():
240
241            try:
242                methodname = obj = None
243                request = recv()
244                ident, methodname, args, kwds = request
245                try:
246                    obj, exposed, gettypeid = id_to_obj[ident]
247                except KeyError as ke:
248                    try:
249                        obj, exposed, gettypeid = \
250                            self.id_to_local_proxy_obj[ident]
251                    except KeyError as second_ke:
252                        raise ke
253
254                if methodname not in exposed:
255                    raise AttributeError(
256                        'method %r of %r object is not in exposed=%r' %
257                        (methodname, type(obj), exposed)
258                        )
259
260                function = getattr(obj, methodname)
261
262                try:
263                    res = function(*args, **kwds)
264                except Exception as e:
265                    msg = ('#ERROR', e)
266                else:
267                    typeid = gettypeid and gettypeid.get(methodname, None)
268                    if typeid:
269                        rident, rexposed = self.create(conn, typeid, res)
270                        token = Token(typeid, self.address, rident)
271                        msg = ('#PROXY', (rexposed, token))
272                    else:
273                        msg = ('#RETURN', res)
274
275            except AttributeError:
276                if methodname is None:
277                    msg = ('#TRACEBACK', format_exc())
278                else:
279                    try:
280                        fallback_func = self.fallback_mapping[methodname]
281                        result = fallback_func(
282                            self, conn, ident, obj, *args, **kwds
283                            )
284                        msg = ('#RETURN', result)
285                    except Exception:
286                        msg = ('#TRACEBACK', format_exc())
287
288            except EOFError:
289                util.debug('got EOF -- exiting thread serving %r',
290                           threading.current_thread().name)
291                sys.exit(0)
292
293            except Exception:
294                msg = ('#TRACEBACK', format_exc())
295
296            try:
297                try:
298                    send(msg)
299                except Exception as e:
300                    send(('#UNSERIALIZABLE', format_exc()))
301            except Exception as e:
302                util.info('exception in thread serving %r',
303                        threading.current_thread().name)
304                util.info(' ... message was %r', msg)
305                util.info(' ... exception was %r', e)
306                conn.close()
307                sys.exit(1)
308
309    def fallback_getvalue(self, conn, ident, obj):
310        return obj
311
312    def fallback_str(self, conn, ident, obj):
313        return str(obj)
314
315    def fallback_repr(self, conn, ident, obj):
316        return repr(obj)
317
318    fallback_mapping = {
319        '__str__':fallback_str,
320        '__repr__':fallback_repr,
321        '#GETVALUE':fallback_getvalue
322        }
323
324    def dummy(self, c):
325        pass
326
327    def debug_info(self, c):
328        '''
329        Return some info --- useful to spot problems with refcounting
330        '''
331        # Perhaps include debug info about 'c'?
332        with self.mutex:
333            result = []
334            keys = list(self.id_to_refcount.keys())
335            keys.sort()
336            for ident in keys:
337                if ident != '0':
338                    result.append('  %s:       refcount=%s\n    %s' %
339                                  (ident, self.id_to_refcount[ident],
340                                   str(self.id_to_obj[ident][0])[:75]))
341            return '\n'.join(result)
342
343    def number_of_objects(self, c):
344        '''
345        Number of shared objects
346        '''
347        # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
348        return len(self.id_to_refcount)
349
350    def shutdown(self, c):
351        '''
352        Shutdown this process
353        '''
354        try:
355            util.debug('manager received shutdown message')
356            c.send(('#RETURN', None))
357        except:
358            import traceback
359            traceback.print_exc()
360        finally:
361            self.stop_event.set()
362
363    def create(*args, **kwds):
364        '''
365        Create a new shared object and return its id
366        '''
367        if len(args) >= 3:
368            self, c, typeid, *args = args
369        elif not args:
370            raise TypeError("descriptor 'create' of 'Server' object "
371                            "needs an argument")
372        else:
373            if 'typeid' not in kwds:
374                raise TypeError('create expected at least 2 positional '
375                                'arguments, got %d' % (len(args)-1))
376            typeid = kwds.pop('typeid')
377            if len(args) >= 2:
378                self, c, *args = args
379                import warnings
380                warnings.warn("Passing 'typeid' as keyword argument is deprecated",
381                              DeprecationWarning, stacklevel=2)
382            else:
383                if 'c' not in kwds:
384                    raise TypeError('create expected at least 2 positional '
385                                    'arguments, got %d' % (len(args)-1))
386                c = kwds.pop('c')
387                self, *args = args
388                import warnings
389                warnings.warn("Passing 'c' as keyword argument is deprecated",
390                              DeprecationWarning, stacklevel=2)
391        args = tuple(args)
392
393        with self.mutex:
394            callable, exposed, method_to_typeid, proxytype = \
395                      self.registry[typeid]
396
397            if callable is None:
398                if kwds or (len(args) != 1):
399                    raise ValueError(
400                        "Without callable, must have one non-keyword argument")
401                obj = args[0]
402            else:
403                obj = callable(*args, **kwds)
404
405            if exposed is None:
406                exposed = public_methods(obj)
407            if method_to_typeid is not None:
408                if not isinstance(method_to_typeid, dict):
409                    raise TypeError(
410                        "Method_to_typeid {0!r}: type {1!s}, not dict".format(
411                            method_to_typeid, type(method_to_typeid)))
412                exposed = list(exposed) + list(method_to_typeid)
413
414            ident = '%x' % id(obj)  # convert to string because xmlrpclib
415                                    # only has 32 bit signed integers
416            util.debug('%r callable returned object with id %r', typeid, ident)
417
418            self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
419            if ident not in self.id_to_refcount:
420                self.id_to_refcount[ident] = 0
421
422        self.incref(c, ident)
423        return ident, tuple(exposed)
424    create.__text_signature__ = '($self, c, typeid, /, *args, **kwds)'
425
426    def get_methods(self, c, token):
427        '''
428        Return the methods of the shared object indicated by token
429        '''
430        return tuple(self.id_to_obj[token.id][1])
431
432    def accept_connection(self, c, name):
433        '''
434        Spawn a new thread to serve this connection
435        '''
436        threading.current_thread().name = name
437        c.send(('#RETURN', None))
438        self.serve_client(c)
439
440    def incref(self, c, ident):
441        with self.mutex:
442            try:
443                self.id_to_refcount[ident] += 1
444            except KeyError as ke:
445                # If no external references exist but an internal (to the
446                # manager) still does and a new external reference is created
447                # from it, restore the manager's tracking of it from the
448                # previously stashed internal ref.
449                if ident in self.id_to_local_proxy_obj:
450                    self.id_to_refcount[ident] = 1
451                    self.id_to_obj[ident] = \
452                        self.id_to_local_proxy_obj[ident]
453                    obj, exposed, gettypeid = self.id_to_obj[ident]
454                    util.debug('Server re-enabled tracking & INCREF %r', ident)
455                else:
456                    raise ke
457
458    def decref(self, c, ident):
459        if ident not in self.id_to_refcount and \
460            ident in self.id_to_local_proxy_obj:
461            util.debug('Server DECREF skipping %r', ident)
462            return
463
464        with self.mutex:
465            if self.id_to_refcount[ident] <= 0:
466                raise AssertionError(
467                    "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
468                        ident, self.id_to_obj[ident],
469                        self.id_to_refcount[ident]))
470            self.id_to_refcount[ident] -= 1
471            if self.id_to_refcount[ident] == 0:
472                del self.id_to_refcount[ident]
473
474        if ident not in self.id_to_refcount:
475            # Two-step process in case the object turns out to contain other
476            # proxy objects (e.g. a managed list of managed lists).
477            # Otherwise, deleting self.id_to_obj[ident] would trigger the
478            # deleting of the stored value (another managed object) which would
479            # in turn attempt to acquire the mutex that is already held here.
480            self.id_to_obj[ident] = (None, (), None)  # thread-safe
481            util.debug('disposing of obj with id %r', ident)
482            with self.mutex:
483                del self.id_to_obj[ident]
484
485
486#
487# Class to represent state of a manager
488#
489
490class State(object):
491    __slots__ = ['value']
492    INITIAL = 0
493    STARTED = 1
494    SHUTDOWN = 2
495
496#
497# Mapping from serializer name to Listener and Client types
498#
499
500listener_client = {
501    'pickle' : (connection.Listener, connection.Client),
502    'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
503    }
504
505#
506# Definition of BaseManager
507#
508
509class BaseManager(object):
510    '''
511    Base class for managers
512    '''
513    _registry = {}
514    _Server = Server
515
516    def __init__(self, address=None, authkey=None, serializer='pickle',
517                 ctx=None):
518        if authkey is None:
519            authkey = process.current_process().authkey
520        self._address = address     # XXX not final address if eg ('', 0)
521        self._authkey = process.AuthenticationString(authkey)
522        self._state = State()
523        self._state.value = State.INITIAL
524        self._serializer = serializer
525        self._Listener, self._Client = listener_client[serializer]
526        self._ctx = ctx or get_context()
527
528    def get_server(self):
529        '''
530        Return server object with serve_forever() method and address attribute
531        '''
532        if self._state.value != State.INITIAL:
533            if self._state.value == State.STARTED:
534                raise ProcessError("Already started server")
535            elif self._state.value == State.SHUTDOWN:
536                raise ProcessError("Manager has shut down")
537            else:
538                raise ProcessError(
539                    "Unknown state {!r}".format(self._state.value))
540        return Server(self._registry, self._address,
541                      self._authkey, self._serializer)
542
543    def connect(self):
544        '''
545        Connect manager object to the server process
546        '''
547        Listener, Client = listener_client[self._serializer]
548        conn = Client(self._address, authkey=self._authkey)
549        dispatch(conn, None, 'dummy')
550        self._state.value = State.STARTED
551
552    def start(self, initializer=None, initargs=()):
553        '''
554        Spawn a server process for this manager object
555        '''
556        if self._state.value != State.INITIAL:
557            if self._state.value == State.STARTED:
558                raise ProcessError("Already started server")
559            elif self._state.value == State.SHUTDOWN:
560                raise ProcessError("Manager has shut down")
561            else:
562                raise ProcessError(
563                    "Unknown state {!r}".format(self._state.value))
564
565        if initializer is not None and not callable(initializer):
566            raise TypeError('initializer must be a callable')
567
568        # pipe over which we will retrieve address of server
569        reader, writer = connection.Pipe(duplex=False)
570
571        # spawn process which runs a server
572        self._process = self._ctx.Process(
573            target=type(self)._run_server,
574            args=(self._registry, self._address, self._authkey,
575                  self._serializer, writer, initializer, initargs),
576            )
577        ident = ':'.join(str(i) for i in self._process._identity)
578        self._process.name = type(self).__name__  + '-' + ident
579        self._process.start()
580
581        # get address of server
582        writer.close()
583        self._address = reader.recv()
584        reader.close()
585
586        # register a finalizer
587        self._state.value = State.STARTED
588        self.shutdown = util.Finalize(
589            self, type(self)._finalize_manager,
590            args=(self._process, self._address, self._authkey,
591                  self._state, self._Client),
592            exitpriority=0
593            )
594
595    @classmethod
596    def _run_server(cls, registry, address, authkey, serializer, writer,
597                    initializer=None, initargs=()):
598        '''
599        Create a server, report its address and run it
600        '''
601        # bpo-36368: protect server process from KeyboardInterrupt signals
602        signal.signal(signal.SIGINT, signal.SIG_IGN)
603
604        if initializer is not None:
605            initializer(*initargs)
606
607        # create server
608        server = cls._Server(registry, address, authkey, serializer)
609
610        # inform parent process of the server's address
611        writer.send(server.address)
612        writer.close()
613
614        # run the manager
615        util.info('manager serving at %r', server.address)
616        server.serve_forever()
617
618    def _create(self, typeid, /, *args, **kwds):
619        '''
620        Create a new shared object; return the token and exposed tuple
621        '''
622        assert self._state.value == State.STARTED, 'server not yet started'
623        conn = self._Client(self._address, authkey=self._authkey)
624        try:
625            id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
626        finally:
627            conn.close()
628        return Token(typeid, self._address, id), exposed
629
630    def join(self, timeout=None):
631        '''
632        Join the manager process (if it has been spawned)
633        '''
634        if self._process is not None:
635            self._process.join(timeout)
636            if not self._process.is_alive():
637                self._process = None
638
639    def _debug_info(self):
640        '''
641        Return some info about the servers shared objects and connections
642        '''
643        conn = self._Client(self._address, authkey=self._authkey)
644        try:
645            return dispatch(conn, None, 'debug_info')
646        finally:
647            conn.close()
648
649    def _number_of_objects(self):
650        '''
651        Return the number of shared objects
652        '''
653        conn = self._Client(self._address, authkey=self._authkey)
654        try:
655            return dispatch(conn, None, 'number_of_objects')
656        finally:
657            conn.close()
658
659    def __enter__(self):
660        if self._state.value == State.INITIAL:
661            self.start()
662        if self._state.value != State.STARTED:
663            if self._state.value == State.INITIAL:
664                raise ProcessError("Unable to start server")
665            elif self._state.value == State.SHUTDOWN:
666                raise ProcessError("Manager has shut down")
667            else:
668                raise ProcessError(
669                    "Unknown state {!r}".format(self._state.value))
670        return self
671
672    def __exit__(self, exc_type, exc_val, exc_tb):
673        self.shutdown()
674
675    @staticmethod
676    def _finalize_manager(process, address, authkey, state, _Client):
677        '''
678        Shutdown the manager process; will be registered as a finalizer
679        '''
680        if process.is_alive():
681            util.info('sending shutdown message to manager')
682            try:
683                conn = _Client(address, authkey=authkey)
684                try:
685                    dispatch(conn, None, 'shutdown')
686                finally:
687                    conn.close()
688            except Exception:
689                pass
690
691            process.join(timeout=1.0)
692            if process.is_alive():
693                util.info('manager still alive')
694                if hasattr(process, 'terminate'):
695                    util.info('trying to `terminate()` manager process')
696                    process.terminate()
697                    process.join(timeout=0.1)
698                    if process.is_alive():
699                        util.info('manager still alive after terminate')
700
701        state.value = State.SHUTDOWN
702        try:
703            del BaseProxy._address_to_local[address]
704        except KeyError:
705            pass
706
707    @property
708    def address(self):
709        return self._address
710
711    @classmethod
712    def register(cls, typeid, callable=None, proxytype=None, exposed=None,
713                 method_to_typeid=None, create_method=True):
714        '''
715        Register a typeid with the manager type
716        '''
717        if '_registry' not in cls.__dict__:
718            cls._registry = cls._registry.copy()
719
720        if proxytype is None:
721            proxytype = AutoProxy
722
723        exposed = exposed or getattr(proxytype, '_exposed_', None)
724
725        method_to_typeid = method_to_typeid or \
726                           getattr(proxytype, '_method_to_typeid_', None)
727
728        if method_to_typeid:
729            for key, value in list(method_to_typeid.items()): # isinstance?
730                assert type(key) is str, '%r is not a string' % key
731                assert type(value) is str, '%r is not a string' % value
732
733        cls._registry[typeid] = (
734            callable, exposed, method_to_typeid, proxytype
735            )
736
737        if create_method:
738            def temp(self, /, *args, **kwds):
739                util.debug('requesting creation of a shared %r object', typeid)
740                token, exp = self._create(typeid, *args, **kwds)
741                proxy = proxytype(
742                    token, self._serializer, manager=self,
743                    authkey=self._authkey, exposed=exp
744                    )
745                conn = self._Client(token.address, authkey=self._authkey)
746                dispatch(conn, None, 'decref', (token.id,))
747                return proxy
748            temp.__name__ = typeid
749            setattr(cls, typeid, temp)
750
751#
752# Subclass of set which get cleared after a fork
753#
754
755class ProcessLocalSet(set):
756    def __init__(self):
757        util.register_after_fork(self, lambda obj: obj.clear())
758    def __reduce__(self):
759        return type(self), ()
760
761#
762# Definition of BaseProxy
763#
764
765class BaseProxy(object):
766    '''
767    A base for proxies of shared objects
768    '''
769    _address_to_local = {}
770    _mutex = util.ForkAwareThreadLock()
771
772    def __init__(self, token, serializer, manager=None,
773                 authkey=None, exposed=None, incref=True, manager_owned=False):
774        with BaseProxy._mutex:
775            tls_idset = BaseProxy._address_to_local.get(token.address, None)
776            if tls_idset is None:
777                tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
778                BaseProxy._address_to_local[token.address] = tls_idset
779
780        # self._tls is used to record the connection used by this
781        # thread to communicate with the manager at token.address
782        self._tls = tls_idset[0]
783
784        # self._idset is used to record the identities of all shared
785        # objects for which the current process owns references and
786        # which are in the manager at token.address
787        self._idset = tls_idset[1]
788
789        self._token = token
790        self._id = self._token.id
791        self._manager = manager
792        self._serializer = serializer
793        self._Client = listener_client[serializer][1]
794
795        # Should be set to True only when a proxy object is being created
796        # on the manager server; primary use case: nested proxy objects.
797        # RebuildProxy detects when a proxy is being created on the manager
798        # and sets this value appropriately.
799        self._owned_by_manager = manager_owned
800
801        if authkey is not None:
802            self._authkey = process.AuthenticationString(authkey)
803        elif self._manager is not None:
804            self._authkey = self._manager._authkey
805        else:
806            self._authkey = process.current_process().authkey
807
808        if incref:
809            self._incref()
810
811        util.register_after_fork(self, BaseProxy._after_fork)
812
813    def _connect(self):
814        util.debug('making connection to manager')
815        name = process.current_process().name
816        if threading.current_thread().name != 'MainThread':
817            name += '|' + threading.current_thread().name
818        conn = self._Client(self._token.address, authkey=self._authkey)
819        dispatch(conn, None, 'accept_connection', (name,))
820        self._tls.connection = conn
821
822    def _callmethod(self, methodname, args=(), kwds={}):
823        '''
824        Try to call a method of the referrent and return a copy of the result
825        '''
826        try:
827            conn = self._tls.connection
828        except AttributeError:
829            util.debug('thread %r does not own a connection',
830                       threading.current_thread().name)
831            self._connect()
832            conn = self._tls.connection
833
834        conn.send((self._id, methodname, args, kwds))
835        kind, result = conn.recv()
836
837        if kind == '#RETURN':
838            return result
839        elif kind == '#PROXY':
840            exposed, token = result
841            proxytype = self._manager._registry[token.typeid][-1]
842            token.address = self._token.address
843            proxy = proxytype(
844                token, self._serializer, manager=self._manager,
845                authkey=self._authkey, exposed=exposed
846                )
847            conn = self._Client(token.address, authkey=self._authkey)
848            dispatch(conn, None, 'decref', (token.id,))
849            return proxy
850        raise convert_to_error(kind, result)
851
852    def _getvalue(self):
853        '''
854        Get a copy of the value of the referent
855        '''
856        return self._callmethod('#GETVALUE')
857
858    def _incref(self):
859        if self._owned_by_manager:
860            util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
861            return
862
863        conn = self._Client(self._token.address, authkey=self._authkey)
864        dispatch(conn, None, 'incref', (self._id,))
865        util.debug('INCREF %r', self._token.id)
866
867        self._idset.add(self._id)
868
869        state = self._manager and self._manager._state
870
871        self._close = util.Finalize(
872            self, BaseProxy._decref,
873            args=(self._token, self._authkey, state,
874                  self._tls, self._idset, self._Client),
875            exitpriority=10
876            )
877
878    @staticmethod
879    def _decref(token, authkey, state, tls, idset, _Client):
880        idset.discard(token.id)
881
882        # check whether manager is still alive
883        if state is None or state.value == State.STARTED:
884            # tell manager this process no longer cares about referent
885            try:
886                util.debug('DECREF %r', token.id)
887                conn = _Client(token.address, authkey=authkey)
888                dispatch(conn, None, 'decref', (token.id,))
889            except Exception as e:
890                util.debug('... decref failed %s', e)
891
892        else:
893            util.debug('DECREF %r -- manager already shutdown', token.id)
894
895        # check whether we can close this thread's connection because
896        # the process owns no more references to objects for this manager
897        if not idset and hasattr(tls, 'connection'):
898            util.debug('thread %r has no more proxies so closing conn',
899                       threading.current_thread().name)
900            tls.connection.close()
901            del tls.connection
902
903    def _after_fork(self):
904        self._manager = None
905        try:
906            self._incref()
907        except Exception as e:
908            # the proxy may just be for a manager which has shutdown
909            util.info('incref failed: %s' % e)
910
911    def __reduce__(self):
912        kwds = {}
913        if get_spawning_popen() is not None:
914            kwds['authkey'] = self._authkey
915
916        if getattr(self, '_isauto', False):
917            kwds['exposed'] = self._exposed_
918            return (RebuildProxy,
919                    (AutoProxy, self._token, self._serializer, kwds))
920        else:
921            return (RebuildProxy,
922                    (type(self), self._token, self._serializer, kwds))
923
924    def __deepcopy__(self, memo):
925        return self._getvalue()
926
927    def __repr__(self):
928        return '<%s object, typeid %r at %#x>' % \
929               (type(self).__name__, self._token.typeid, id(self))
930
931    def __str__(self):
932        '''
933        Return representation of the referent (or a fall-back if that fails)
934        '''
935        try:
936            return self._callmethod('__repr__')
937        except Exception:
938            return repr(self)[:-1] + "; '__str__()' failed>"
939
940#
941# Function used for unpickling
942#
943
944def RebuildProxy(func, token, serializer, kwds):
945    '''
946    Function used for unpickling proxy objects.
947    '''
948    server = getattr(process.current_process(), '_manager_server', None)
949    if server and server.address == token.address:
950        util.debug('Rebuild a proxy owned by manager, token=%r', token)
951        kwds['manager_owned'] = True
952        if token.id not in server.id_to_local_proxy_obj:
953            server.id_to_local_proxy_obj[token.id] = \
954                server.id_to_obj[token.id]
955    incref = (
956        kwds.pop('incref', True) and
957        not getattr(process.current_process(), '_inheriting', False)
958        )
959    return func(token, serializer, incref=incref, **kwds)
960
961#
962# Functions to create proxies and proxy types
963#
964
965def MakeProxyType(name, exposed, _cache={}):
966    '''
967    Return a proxy type whose methods are given by `exposed`
968    '''
969    exposed = tuple(exposed)
970    try:
971        return _cache[(name, exposed)]
972    except KeyError:
973        pass
974
975    dic = {}
976
977    for meth in exposed:
978        exec('''def %s(self, /, *args, **kwds):
979        return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
980
981    ProxyType = type(name, (BaseProxy,), dic)
982    ProxyType._exposed_ = exposed
983    _cache[(name, exposed)] = ProxyType
984    return ProxyType
985
986
987def AutoProxy(token, serializer, manager=None, authkey=None,
988              exposed=None, incref=True):
989    '''
990    Return an auto-proxy for `token`
991    '''
992    _Client = listener_client[serializer][1]
993
994    if exposed is None:
995        conn = _Client(token.address, authkey=authkey)
996        try:
997            exposed = dispatch(conn, None, 'get_methods', (token,))
998        finally:
999            conn.close()
1000
1001    if authkey is None and manager is not None:
1002        authkey = manager._authkey
1003    if authkey is None:
1004        authkey = process.current_process().authkey
1005
1006    ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
1007    proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
1008                      incref=incref)
1009    proxy._isauto = True
1010    return proxy
1011
1012#
1013# Types/callables which we will register with SyncManager
1014#
1015
1016class Namespace(object):
1017    def __init__(self, /, **kwds):
1018        self.__dict__.update(kwds)
1019    def __repr__(self):
1020        items = list(self.__dict__.items())
1021        temp = []
1022        for name, value in items:
1023            if not name.startswith('_'):
1024                temp.append('%s=%r' % (name, value))
1025        temp.sort()
1026        return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
1027
1028class Value(object):
1029    def __init__(self, typecode, value, lock=True):
1030        self._typecode = typecode
1031        self._value = value
1032    def get(self):
1033        return self._value
1034    def set(self, value):
1035        self._value = value
1036    def __repr__(self):
1037        return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1038    value = property(get, set)
1039
1040def Array(typecode, sequence, lock=True):
1041    return array.array(typecode, sequence)
1042
1043#
1044# Proxy types used by SyncManager
1045#
1046
1047class IteratorProxy(BaseProxy):
1048    _exposed_ = ('__next__', 'send', 'throw', 'close')
1049    def __iter__(self):
1050        return self
1051    def __next__(self, *args):
1052        return self._callmethod('__next__', args)
1053    def send(self, *args):
1054        return self._callmethod('send', args)
1055    def throw(self, *args):
1056        return self._callmethod('throw', args)
1057    def close(self, *args):
1058        return self._callmethod('close', args)
1059
1060
1061class AcquirerProxy(BaseProxy):
1062    _exposed_ = ('acquire', 'release')
1063    def acquire(self, blocking=True, timeout=None):
1064        args = (blocking,) if timeout is None else (blocking, timeout)
1065        return self._callmethod('acquire', args)
1066    def release(self):
1067        return self._callmethod('release')
1068    def __enter__(self):
1069        return self._callmethod('acquire')
1070    def __exit__(self, exc_type, exc_val, exc_tb):
1071        return self._callmethod('release')
1072
1073
1074class ConditionProxy(AcquirerProxy):
1075    _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
1076    def wait(self, timeout=None):
1077        return self._callmethod('wait', (timeout,))
1078    def notify(self, n=1):
1079        return self._callmethod('notify', (n,))
1080    def notify_all(self):
1081        return self._callmethod('notify_all')
1082    def wait_for(self, predicate, timeout=None):
1083        result = predicate()
1084        if result:
1085            return result
1086        if timeout is not None:
1087            endtime = time.monotonic() + timeout
1088        else:
1089            endtime = None
1090            waittime = None
1091        while not result:
1092            if endtime is not None:
1093                waittime = endtime - time.monotonic()
1094                if waittime <= 0:
1095                    break
1096            self.wait(waittime)
1097            result = predicate()
1098        return result
1099
1100
1101class EventProxy(BaseProxy):
1102    _exposed_ = ('is_set', 'set', 'clear', 'wait')
1103    def is_set(self):
1104        return self._callmethod('is_set')
1105    def set(self):
1106        return self._callmethod('set')
1107    def clear(self):
1108        return self._callmethod('clear')
1109    def wait(self, timeout=None):
1110        return self._callmethod('wait', (timeout,))
1111
1112
1113class BarrierProxy(BaseProxy):
1114    _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1115    def wait(self, timeout=None):
1116        return self._callmethod('wait', (timeout,))
1117    def abort(self):
1118        return self._callmethod('abort')
1119    def reset(self):
1120        return self._callmethod('reset')
1121    @property
1122    def parties(self):
1123        return self._callmethod('__getattribute__', ('parties',))
1124    @property
1125    def n_waiting(self):
1126        return self._callmethod('__getattribute__', ('n_waiting',))
1127    @property
1128    def broken(self):
1129        return self._callmethod('__getattribute__', ('broken',))
1130
1131
1132class NamespaceProxy(BaseProxy):
1133    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1134    def __getattr__(self, key):
1135        if key[0] == '_':
1136            return object.__getattribute__(self, key)
1137        callmethod = object.__getattribute__(self, '_callmethod')
1138        return callmethod('__getattribute__', (key,))
1139    def __setattr__(self, key, value):
1140        if key[0] == '_':
1141            return object.__setattr__(self, key, value)
1142        callmethod = object.__getattribute__(self, '_callmethod')
1143        return callmethod('__setattr__', (key, value))
1144    def __delattr__(self, key):
1145        if key[0] == '_':
1146            return object.__delattr__(self, key)
1147        callmethod = object.__getattribute__(self, '_callmethod')
1148        return callmethod('__delattr__', (key,))
1149
1150
1151class ValueProxy(BaseProxy):
1152    _exposed_ = ('get', 'set')
1153    def get(self):
1154        return self._callmethod('get')
1155    def set(self, value):
1156        return self._callmethod('set', (value,))
1157    value = property(get, set)
1158
1159
1160BaseListProxy = MakeProxyType('BaseListProxy', (
1161    '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1162    '__mul__', '__reversed__', '__rmul__', '__setitem__',
1163    'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1164    'reverse', 'sort', '__imul__'
1165    ))
1166class ListProxy(BaseListProxy):
1167    def __iadd__(self, value):
1168        self._callmethod('extend', (value,))
1169        return self
1170    def __imul__(self, value):
1171        self._callmethod('__imul__', (value,))
1172        return self
1173
1174
1175DictProxy = MakeProxyType('DictProxy', (
1176    '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
1177    '__setitem__', 'clear', 'copy', 'get', 'items',
1178    'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1179    ))
1180DictProxy._method_to_typeid_ = {
1181    '__iter__': 'Iterator',
1182    }
1183
1184
1185ArrayProxy = MakeProxyType('ArrayProxy', (
1186    '__len__', '__getitem__', '__setitem__'
1187    ))
1188
1189
1190BasePoolProxy = MakeProxyType('PoolProxy', (
1191    'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1192    'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
1193    ))
1194BasePoolProxy._method_to_typeid_ = {
1195    'apply_async': 'AsyncResult',
1196    'map_async': 'AsyncResult',
1197    'starmap_async': 'AsyncResult',
1198    'imap': 'Iterator',
1199    'imap_unordered': 'Iterator'
1200    }
1201class PoolProxy(BasePoolProxy):
1202    def __enter__(self):
1203        return self
1204    def __exit__(self, exc_type, exc_val, exc_tb):
1205        self.terminate()
1206
1207#
1208# Definition of SyncManager
1209#
1210
1211class SyncManager(BaseManager):
1212    '''
1213    Subclass of `BaseManager` which supports a number of shared object types.
1214
1215    The types registered are those intended for the synchronization
1216    of threads, plus `dict`, `list` and `Namespace`.
1217
1218    The `multiprocessing.Manager()` function creates started instances of
1219    this class.
1220    '''
1221
1222SyncManager.register('Queue', queue.Queue)
1223SyncManager.register('JoinableQueue', queue.Queue)
1224SyncManager.register('Event', threading.Event, EventProxy)
1225SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1226SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1227SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1228SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1229                     AcquirerProxy)
1230SyncManager.register('Condition', threading.Condition, ConditionProxy)
1231SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
1232SyncManager.register('Pool', pool.Pool, PoolProxy)
1233SyncManager.register('list', list, ListProxy)
1234SyncManager.register('dict', dict, DictProxy)
1235SyncManager.register('Value', Value, ValueProxy)
1236SyncManager.register('Array', Array, ArrayProxy)
1237SyncManager.register('Namespace', Namespace, NamespaceProxy)
1238
1239# types returned by methods of PoolProxy
1240SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1241SyncManager.register('AsyncResult', create_method=False)
1242
1243#
1244# Definition of SharedMemoryManager and SharedMemoryServer
1245#
1246
1247if HAS_SHMEM:
1248    class _SharedMemoryTracker:
1249        "Manages one or more shared memory segments."
1250
1251        def __init__(self, name, segment_names=[]):
1252            self.shared_memory_context_name = name
1253            self.segment_names = segment_names
1254
1255        def register_segment(self, segment_name):
1256            "Adds the supplied shared memory block name to tracker."
1257            util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1258            self.segment_names.append(segment_name)
1259
1260        def destroy_segment(self, segment_name):
1261            """Calls unlink() on the shared memory block with the supplied name
1262            and removes it from the list of blocks being tracked."""
1263            util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1264            self.segment_names.remove(segment_name)
1265            segment = shared_memory.SharedMemory(segment_name)
1266            segment.close()
1267            segment.unlink()
1268
1269        def unlink(self):
1270            "Calls destroy_segment() on all tracked shared memory blocks."
1271            for segment_name in self.segment_names[:]:
1272                self.destroy_segment(segment_name)
1273
1274        def __del__(self):
1275            util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1276            self.unlink()
1277
1278        def __getstate__(self):
1279            return (self.shared_memory_context_name, self.segment_names)
1280
1281        def __setstate__(self, state):
1282            self.__init__(*state)
1283
1284
1285    class SharedMemoryServer(Server):
1286
1287        public = Server.public + \
1288                 ['track_segment', 'release_segment', 'list_segments']
1289
1290        def __init__(self, *args, **kwargs):
1291            Server.__init__(self, *args, **kwargs)
1292            self.shared_memory_context = \
1293                _SharedMemoryTracker(f"shmm_{self.address}_{getpid()}")
1294            util.debug(f"SharedMemoryServer started by pid {getpid()}")
1295
1296        def create(*args, **kwargs):
1297            """Create a new distributed-shared object (not backed by a shared
1298            memory block) and return its id to be used in a Proxy Object."""
1299            # Unless set up as a shared proxy, don't make shared_memory_context
1300            # a standard part of kwargs.  This makes things easier for supplying
1301            # simple functions.
1302            if len(args) >= 3:
1303                typeod = args[2]
1304            elif 'typeid' in kwargs:
1305                typeid = kwargs['typeid']
1306            elif not args:
1307                raise TypeError("descriptor 'create' of 'SharedMemoryServer' "
1308                                "object needs an argument")
1309            else:
1310                raise TypeError('create expected at least 2 positional '
1311                                'arguments, got %d' % (len(args)-1))
1312            if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1313                kwargs['shared_memory_context'] = self.shared_memory_context
1314            return Server.create(*args, **kwargs)
1315        create.__text_signature__ = '($self, c, typeid, /, *args, **kwargs)'
1316
1317        def shutdown(self, c):
1318            "Call unlink() on all tracked shared memory, terminate the Server."
1319            self.shared_memory_context.unlink()
1320            return Server.shutdown(self, c)
1321
1322        def track_segment(self, c, segment_name):
1323            "Adds the supplied shared memory block name to Server's tracker."
1324            self.shared_memory_context.register_segment(segment_name)
1325
1326        def release_segment(self, c, segment_name):
1327            """Calls unlink() on the shared memory block with the supplied name
1328            and removes it from the tracker instance inside the Server."""
1329            self.shared_memory_context.destroy_segment(segment_name)
1330
1331        def list_segments(self, c):
1332            """Returns a list of names of shared memory blocks that the Server
1333            is currently tracking."""
1334            return self.shared_memory_context.segment_names
1335
1336
1337    class SharedMemoryManager(BaseManager):
1338        """Like SyncManager but uses SharedMemoryServer instead of Server.
1339
1340        It provides methods for creating and returning SharedMemory instances
1341        and for creating a list-like object (ShareableList) backed by shared
1342        memory.  It also provides methods that create and return Proxy Objects
1343        that support synchronization across processes (i.e. multi-process-safe
1344        locks and semaphores).
1345        """
1346
1347        _Server = SharedMemoryServer
1348
1349        def __init__(self, *args, **kwargs):
1350            if os.name == "posix":
1351                # bpo-36867: Ensure the resource_tracker is running before
1352                # launching the manager process, so that concurrent
1353                # shared_memory manipulation both in the manager and in the
1354                # current process does not create two resource_tracker
1355                # processes.
1356                from . import resource_tracker
1357                resource_tracker.ensure_running()
1358            BaseManager.__init__(self, *args, **kwargs)
1359            util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1360
1361        def __del__(self):
1362            util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1363            pass
1364
1365        def get_server(self):
1366            'Better than monkeypatching for now; merge into Server ultimately'
1367            if self._state.value != State.INITIAL:
1368                if self._state.value == State.STARTED:
1369                    raise ProcessError("Already started SharedMemoryServer")
1370                elif self._state.value == State.SHUTDOWN:
1371                    raise ProcessError("SharedMemoryManager has shut down")
1372                else:
1373                    raise ProcessError(
1374                        "Unknown state {!r}".format(self._state.value))
1375            return self._Server(self._registry, self._address,
1376                                self._authkey, self._serializer)
1377
1378        def SharedMemory(self, size):
1379            """Returns a new SharedMemory instance with the specified size in
1380            bytes, to be tracked by the manager."""
1381            with self._Client(self._address, authkey=self._authkey) as conn:
1382                sms = shared_memory.SharedMemory(None, create=True, size=size)
1383                try:
1384                    dispatch(conn, None, 'track_segment', (sms.name,))
1385                except BaseException as e:
1386                    sms.unlink()
1387                    raise e
1388            return sms
1389
1390        def ShareableList(self, sequence):
1391            """Returns a new ShareableList instance populated with the values
1392            from the input sequence, to be tracked by the manager."""
1393            with self._Client(self._address, authkey=self._authkey) as conn:
1394                sl = shared_memory.ShareableList(sequence)
1395                try:
1396                    dispatch(conn, None, 'track_segment', (sl.shm.name,))
1397                except BaseException as e:
1398                    sl.shm.unlink()
1399                    raise e
1400            return sl
1401