1# 2# Module providing manager classes for dealing 3# with shared objects 4# 5# multiprocessing/managers.py 6# 7# Copyright (c) 2006-2008, R Oudkerk 8# Licensed to PSF under a Contributor Agreement. 9# 10 11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] 12 13# 14# Imports 15# 16 17import sys 18import threading 19import signal 20import array 21import queue 22import time 23import types 24import os 25from os import getpid 26 27from traceback import format_exc 28 29from . import connection 30from .context import reduction, get_spawning_popen, ProcessError 31from . import pool 32from . import process 33from . import util 34from . import get_context 35try: 36 from . import shared_memory 37except ImportError: 38 HAS_SHMEM = False 39else: 40 HAS_SHMEM = True 41 __all__.append('SharedMemoryManager') 42 43# 44# Register some things for pickling 45# 46 47def reduce_array(a): 48 return array.array, (a.typecode, a.tobytes()) 49reduction.register(array.array, reduce_array) 50 51view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] 52if view_types[0] is not list: # only needed in Py3.0 53 def rebuild_as_list(obj): 54 return list, (list(obj),) 55 for view_type in view_types: 56 reduction.register(view_type, rebuild_as_list) 57 58# 59# Type for identifying shared objects 60# 61 62class Token(object): 63 ''' 64 Type to uniquely identify a shared object 65 ''' 66 __slots__ = ('typeid', 'address', 'id') 67 68 def __init__(self, typeid, address, id): 69 (self.typeid, self.address, self.id) = (typeid, address, id) 70 71 def __getstate__(self): 72 return (self.typeid, self.address, self.id) 73 74 def __setstate__(self, state): 75 (self.typeid, self.address, self.id) = state 76 77 def __repr__(self): 78 return '%s(typeid=%r, address=%r, id=%r)' % \ 79 (self.__class__.__name__, self.typeid, self.address, self.id) 80 81# 82# Function for communication with a manager's server process 83# 84 85def dispatch(c, id, methodname, args=(), kwds={}): 86 ''' 87 Send a message to manager using connection `c` and return response 88 ''' 89 c.send((id, methodname, args, kwds)) 90 kind, result = c.recv() 91 if kind == '#RETURN': 92 return result 93 raise convert_to_error(kind, result) 94 95def convert_to_error(kind, result): 96 if kind == '#ERROR': 97 return result 98 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'): 99 if not isinstance(result, str): 100 raise TypeError( 101 "Result {0!r} (kind '{1}') type is {2}, not str".format( 102 result, kind, type(result))) 103 if kind == '#UNSERIALIZABLE': 104 return RemoteError('Unserializable message: %s\n' % result) 105 else: 106 return RemoteError(result) 107 else: 108 return ValueError('Unrecognized message type {!r}'.format(kind)) 109 110class RemoteError(Exception): 111 def __str__(self): 112 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) 113 114# 115# Functions for finding the method names of an object 116# 117 118def all_methods(obj): 119 ''' 120 Return a list of names of methods of `obj` 121 ''' 122 temp = [] 123 for name in dir(obj): 124 func = getattr(obj, name) 125 if callable(func): 126 temp.append(name) 127 return temp 128 129def public_methods(obj): 130 ''' 131 Return a list of names of methods of `obj` which do not start with '_' 132 ''' 133 return [name for name in all_methods(obj) if name[0] != '_'] 134 135# 136# Server which is run in a process controlled by a manager 137# 138 139class Server(object): 140 ''' 141 Server class which runs in a process controlled by a manager object 142 ''' 143 public = ['shutdown', 'create', 'accept_connection', 'get_methods', 144 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] 145 146 def __init__(self, registry, address, authkey, serializer): 147 if not isinstance(authkey, bytes): 148 raise TypeError( 149 "Authkey {0!r} is type {1!s}, not bytes".format( 150 authkey, type(authkey))) 151 self.registry = registry 152 self.authkey = process.AuthenticationString(authkey) 153 Listener, Client = listener_client[serializer] 154 155 # do authentication later 156 self.listener = Listener(address=address, backlog=16) 157 self.address = self.listener.address 158 159 self.id_to_obj = {'0': (None, ())} 160 self.id_to_refcount = {} 161 self.id_to_local_proxy_obj = {} 162 self.mutex = threading.Lock() 163 164 def serve_forever(self): 165 ''' 166 Run the server forever 167 ''' 168 self.stop_event = threading.Event() 169 process.current_process()._manager_server = self 170 try: 171 accepter = threading.Thread(target=self.accepter) 172 accepter.daemon = True 173 accepter.start() 174 try: 175 while not self.stop_event.is_set(): 176 self.stop_event.wait(1) 177 except (KeyboardInterrupt, SystemExit): 178 pass 179 finally: 180 if sys.stdout != sys.__stdout__: # what about stderr? 181 util.debug('resetting stdout, stderr') 182 sys.stdout = sys.__stdout__ 183 sys.stderr = sys.__stderr__ 184 sys.exit(0) 185 186 def accepter(self): 187 while True: 188 try: 189 c = self.listener.accept() 190 except OSError: 191 continue 192 t = threading.Thread(target=self.handle_request, args=(c,)) 193 t.daemon = True 194 t.start() 195 196 def _handle_request(self, c): 197 request = None 198 try: 199 connection.deliver_challenge(c, self.authkey) 200 connection.answer_challenge(c, self.authkey) 201 request = c.recv() 202 ignore, funcname, args, kwds = request 203 assert funcname in self.public, '%r unrecognized' % funcname 204 func = getattr(self, funcname) 205 except Exception: 206 msg = ('#TRACEBACK', format_exc()) 207 else: 208 try: 209 result = func(c, *args, **kwds) 210 except Exception: 211 msg = ('#TRACEBACK', format_exc()) 212 else: 213 msg = ('#RETURN', result) 214 215 try: 216 c.send(msg) 217 except Exception as e: 218 try: 219 c.send(('#TRACEBACK', format_exc())) 220 except Exception: 221 pass 222 util.info('Failure to send message: %r', msg) 223 util.info(' ... request was %r', request) 224 util.info(' ... exception was %r', e) 225 226 def handle_request(self, conn): 227 ''' 228 Handle a new connection 229 ''' 230 try: 231 self._handle_request(conn) 232 except SystemExit: 233 # Server.serve_client() calls sys.exit(0) on EOF 234 pass 235 finally: 236 conn.close() 237 238 def serve_client(self, conn): 239 ''' 240 Handle requests from the proxies in a particular process/thread 241 ''' 242 util.debug('starting server thread to service %r', 243 threading.current_thread().name) 244 245 recv = conn.recv 246 send = conn.send 247 id_to_obj = self.id_to_obj 248 249 while not self.stop_event.is_set(): 250 251 try: 252 methodname = obj = None 253 request = recv() 254 ident, methodname, args, kwds = request 255 try: 256 obj, exposed, gettypeid = id_to_obj[ident] 257 except KeyError as ke: 258 try: 259 obj, exposed, gettypeid = \ 260 self.id_to_local_proxy_obj[ident] 261 except KeyError: 262 raise ke 263 264 if methodname not in exposed: 265 raise AttributeError( 266 'method %r of %r object is not in exposed=%r' % 267 (methodname, type(obj), exposed) 268 ) 269 270 function = getattr(obj, methodname) 271 272 try: 273 res = function(*args, **kwds) 274 except Exception as e: 275 msg = ('#ERROR', e) 276 else: 277 typeid = gettypeid and gettypeid.get(methodname, None) 278 if typeid: 279 rident, rexposed = self.create(conn, typeid, res) 280 token = Token(typeid, self.address, rident) 281 msg = ('#PROXY', (rexposed, token)) 282 else: 283 msg = ('#RETURN', res) 284 285 except AttributeError: 286 if methodname is None: 287 msg = ('#TRACEBACK', format_exc()) 288 else: 289 try: 290 fallback_func = self.fallback_mapping[methodname] 291 result = fallback_func( 292 self, conn, ident, obj, *args, **kwds 293 ) 294 msg = ('#RETURN', result) 295 except Exception: 296 msg = ('#TRACEBACK', format_exc()) 297 298 except EOFError: 299 util.debug('got EOF -- exiting thread serving %r', 300 threading.current_thread().name) 301 sys.exit(0) 302 303 except Exception: 304 msg = ('#TRACEBACK', format_exc()) 305 306 try: 307 try: 308 send(msg) 309 except Exception: 310 send(('#UNSERIALIZABLE', format_exc())) 311 except Exception as e: 312 util.info('exception in thread serving %r', 313 threading.current_thread().name) 314 util.info(' ... message was %r', msg) 315 util.info(' ... exception was %r', e) 316 conn.close() 317 sys.exit(1) 318 319 def fallback_getvalue(self, conn, ident, obj): 320 return obj 321 322 def fallback_str(self, conn, ident, obj): 323 return str(obj) 324 325 def fallback_repr(self, conn, ident, obj): 326 return repr(obj) 327 328 fallback_mapping = { 329 '__str__':fallback_str, 330 '__repr__':fallback_repr, 331 '#GETVALUE':fallback_getvalue 332 } 333 334 def dummy(self, c): 335 pass 336 337 def debug_info(self, c): 338 ''' 339 Return some info --- useful to spot problems with refcounting 340 ''' 341 # Perhaps include debug info about 'c'? 342 with self.mutex: 343 result = [] 344 keys = list(self.id_to_refcount.keys()) 345 keys.sort() 346 for ident in keys: 347 if ident != '0': 348 result.append(' %s: refcount=%s\n %s' % 349 (ident, self.id_to_refcount[ident], 350 str(self.id_to_obj[ident][0])[:75])) 351 return '\n'.join(result) 352 353 def number_of_objects(self, c): 354 ''' 355 Number of shared objects 356 ''' 357 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0' 358 return len(self.id_to_refcount) 359 360 def shutdown(self, c): 361 ''' 362 Shutdown this process 363 ''' 364 try: 365 util.debug('manager received shutdown message') 366 c.send(('#RETURN', None)) 367 except: 368 import traceback 369 traceback.print_exc() 370 finally: 371 self.stop_event.set() 372 373 def create(self, c, typeid, /, *args, **kwds): 374 ''' 375 Create a new shared object and return its id 376 ''' 377 with self.mutex: 378 callable, exposed, method_to_typeid, proxytype = \ 379 self.registry[typeid] 380 381 if callable is None: 382 if kwds or (len(args) != 1): 383 raise ValueError( 384 "Without callable, must have one non-keyword argument") 385 obj = args[0] 386 else: 387 obj = callable(*args, **kwds) 388 389 if exposed is None: 390 exposed = public_methods(obj) 391 if method_to_typeid is not None: 392 if not isinstance(method_to_typeid, dict): 393 raise TypeError( 394 "Method_to_typeid {0!r}: type {1!s}, not dict".format( 395 method_to_typeid, type(method_to_typeid))) 396 exposed = list(exposed) + list(method_to_typeid) 397 398 ident = '%x' % id(obj) # convert to string because xmlrpclib 399 # only has 32 bit signed integers 400 util.debug('%r callable returned object with id %r', typeid, ident) 401 402 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) 403 if ident not in self.id_to_refcount: 404 self.id_to_refcount[ident] = 0 405 406 self.incref(c, ident) 407 return ident, tuple(exposed) 408 409 def get_methods(self, c, token): 410 ''' 411 Return the methods of the shared object indicated by token 412 ''' 413 return tuple(self.id_to_obj[token.id][1]) 414 415 def accept_connection(self, c, name): 416 ''' 417 Spawn a new thread to serve this connection 418 ''' 419 threading.current_thread().name = name 420 c.send(('#RETURN', None)) 421 self.serve_client(c) 422 423 def incref(self, c, ident): 424 with self.mutex: 425 try: 426 self.id_to_refcount[ident] += 1 427 except KeyError as ke: 428 # If no external references exist but an internal (to the 429 # manager) still does and a new external reference is created 430 # from it, restore the manager's tracking of it from the 431 # previously stashed internal ref. 432 if ident in self.id_to_local_proxy_obj: 433 self.id_to_refcount[ident] = 1 434 self.id_to_obj[ident] = \ 435 self.id_to_local_proxy_obj[ident] 436 obj, exposed, gettypeid = self.id_to_obj[ident] 437 util.debug('Server re-enabled tracking & INCREF %r', ident) 438 else: 439 raise ke 440 441 def decref(self, c, ident): 442 if ident not in self.id_to_refcount and \ 443 ident in self.id_to_local_proxy_obj: 444 util.debug('Server DECREF skipping %r', ident) 445 return 446 447 with self.mutex: 448 if self.id_to_refcount[ident] <= 0: 449 raise AssertionError( 450 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format( 451 ident, self.id_to_obj[ident], 452 self.id_to_refcount[ident])) 453 self.id_to_refcount[ident] -= 1 454 if self.id_to_refcount[ident] == 0: 455 del self.id_to_refcount[ident] 456 457 if ident not in self.id_to_refcount: 458 # Two-step process in case the object turns out to contain other 459 # proxy objects (e.g. a managed list of managed lists). 460 # Otherwise, deleting self.id_to_obj[ident] would trigger the 461 # deleting of the stored value (another managed object) which would 462 # in turn attempt to acquire the mutex that is already held here. 463 self.id_to_obj[ident] = (None, (), None) # thread-safe 464 util.debug('disposing of obj with id %r', ident) 465 with self.mutex: 466 del self.id_to_obj[ident] 467 468 469# 470# Class to represent state of a manager 471# 472 473class State(object): 474 __slots__ = ['value'] 475 INITIAL = 0 476 STARTED = 1 477 SHUTDOWN = 2 478 479# 480# Mapping from serializer name to Listener and Client types 481# 482 483listener_client = { 484 'pickle' : (connection.Listener, connection.Client), 485 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) 486 } 487 488# 489# Definition of BaseManager 490# 491 492class BaseManager(object): 493 ''' 494 Base class for managers 495 ''' 496 _registry = {} 497 _Server = Server 498 499 def __init__(self, address=None, authkey=None, serializer='pickle', 500 ctx=None): 501 if authkey is None: 502 authkey = process.current_process().authkey 503 self._address = address # XXX not final address if eg ('', 0) 504 self._authkey = process.AuthenticationString(authkey) 505 self._state = State() 506 self._state.value = State.INITIAL 507 self._serializer = serializer 508 self._Listener, self._Client = listener_client[serializer] 509 self._ctx = ctx or get_context() 510 511 def get_server(self): 512 ''' 513 Return server object with serve_forever() method and address attribute 514 ''' 515 if self._state.value != State.INITIAL: 516 if self._state.value == State.STARTED: 517 raise ProcessError("Already started server") 518 elif self._state.value == State.SHUTDOWN: 519 raise ProcessError("Manager has shut down") 520 else: 521 raise ProcessError( 522 "Unknown state {!r}".format(self._state.value)) 523 return Server(self._registry, self._address, 524 self._authkey, self._serializer) 525 526 def connect(self): 527 ''' 528 Connect manager object to the server process 529 ''' 530 Listener, Client = listener_client[self._serializer] 531 conn = Client(self._address, authkey=self._authkey) 532 dispatch(conn, None, 'dummy') 533 self._state.value = State.STARTED 534 535 def start(self, initializer=None, initargs=()): 536 ''' 537 Spawn a server process for this manager object 538 ''' 539 if self._state.value != State.INITIAL: 540 if self._state.value == State.STARTED: 541 raise ProcessError("Already started server") 542 elif self._state.value == State.SHUTDOWN: 543 raise ProcessError("Manager has shut down") 544 else: 545 raise ProcessError( 546 "Unknown state {!r}".format(self._state.value)) 547 548 if initializer is not None and not callable(initializer): 549 raise TypeError('initializer must be a callable') 550 551 # pipe over which we will retrieve address of server 552 reader, writer = connection.Pipe(duplex=False) 553 554 # spawn process which runs a server 555 self._process = self._ctx.Process( 556 target=type(self)._run_server, 557 args=(self._registry, self._address, self._authkey, 558 self._serializer, writer, initializer, initargs), 559 ) 560 ident = ':'.join(str(i) for i in self._process._identity) 561 self._process.name = type(self).__name__ + '-' + ident 562 self._process.start() 563 564 # get address of server 565 writer.close() 566 self._address = reader.recv() 567 reader.close() 568 569 # register a finalizer 570 self._state.value = State.STARTED 571 self.shutdown = util.Finalize( 572 self, type(self)._finalize_manager, 573 args=(self._process, self._address, self._authkey, 574 self._state, self._Client), 575 exitpriority=0 576 ) 577 578 @classmethod 579 def _run_server(cls, registry, address, authkey, serializer, writer, 580 initializer=None, initargs=()): 581 ''' 582 Create a server, report its address and run it 583 ''' 584 # bpo-36368: protect server process from KeyboardInterrupt signals 585 signal.signal(signal.SIGINT, signal.SIG_IGN) 586 587 if initializer is not None: 588 initializer(*initargs) 589 590 # create server 591 server = cls._Server(registry, address, authkey, serializer) 592 593 # inform parent process of the server's address 594 writer.send(server.address) 595 writer.close() 596 597 # run the manager 598 util.info('manager serving at %r', server.address) 599 server.serve_forever() 600 601 def _create(self, typeid, /, *args, **kwds): 602 ''' 603 Create a new shared object; return the token and exposed tuple 604 ''' 605 assert self._state.value == State.STARTED, 'server not yet started' 606 conn = self._Client(self._address, authkey=self._authkey) 607 try: 608 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) 609 finally: 610 conn.close() 611 return Token(typeid, self._address, id), exposed 612 613 def join(self, timeout=None): 614 ''' 615 Join the manager process (if it has been spawned) 616 ''' 617 if self._process is not None: 618 self._process.join(timeout) 619 if not self._process.is_alive(): 620 self._process = None 621 622 def _debug_info(self): 623 ''' 624 Return some info about the servers shared objects and connections 625 ''' 626 conn = self._Client(self._address, authkey=self._authkey) 627 try: 628 return dispatch(conn, None, 'debug_info') 629 finally: 630 conn.close() 631 632 def _number_of_objects(self): 633 ''' 634 Return the number of shared objects 635 ''' 636 conn = self._Client(self._address, authkey=self._authkey) 637 try: 638 return dispatch(conn, None, 'number_of_objects') 639 finally: 640 conn.close() 641 642 def __enter__(self): 643 if self._state.value == State.INITIAL: 644 self.start() 645 if self._state.value != State.STARTED: 646 if self._state.value == State.INITIAL: 647 raise ProcessError("Unable to start server") 648 elif self._state.value == State.SHUTDOWN: 649 raise ProcessError("Manager has shut down") 650 else: 651 raise ProcessError( 652 "Unknown state {!r}".format(self._state.value)) 653 return self 654 655 def __exit__(self, exc_type, exc_val, exc_tb): 656 self.shutdown() 657 658 @staticmethod 659 def _finalize_manager(process, address, authkey, state, _Client): 660 ''' 661 Shutdown the manager process; will be registered as a finalizer 662 ''' 663 if process.is_alive(): 664 util.info('sending shutdown message to manager') 665 try: 666 conn = _Client(address, authkey=authkey) 667 try: 668 dispatch(conn, None, 'shutdown') 669 finally: 670 conn.close() 671 except Exception: 672 pass 673 674 process.join(timeout=1.0) 675 if process.is_alive(): 676 util.info('manager still alive') 677 if hasattr(process, 'terminate'): 678 util.info('trying to `terminate()` manager process') 679 process.terminate() 680 process.join(timeout=0.1) 681 if process.is_alive(): 682 util.info('manager still alive after terminate') 683 684 state.value = State.SHUTDOWN 685 try: 686 del BaseProxy._address_to_local[address] 687 except KeyError: 688 pass 689 690 @property 691 def address(self): 692 return self._address 693 694 @classmethod 695 def register(cls, typeid, callable=None, proxytype=None, exposed=None, 696 method_to_typeid=None, create_method=True): 697 ''' 698 Register a typeid with the manager type 699 ''' 700 if '_registry' not in cls.__dict__: 701 cls._registry = cls._registry.copy() 702 703 if proxytype is None: 704 proxytype = AutoProxy 705 706 exposed = exposed or getattr(proxytype, '_exposed_', None) 707 708 method_to_typeid = method_to_typeid or \ 709 getattr(proxytype, '_method_to_typeid_', None) 710 711 if method_to_typeid: 712 for key, value in list(method_to_typeid.items()): # isinstance? 713 assert type(key) is str, '%r is not a string' % key 714 assert type(value) is str, '%r is not a string' % value 715 716 cls._registry[typeid] = ( 717 callable, exposed, method_to_typeid, proxytype 718 ) 719 720 if create_method: 721 def temp(self, /, *args, **kwds): 722 util.debug('requesting creation of a shared %r object', typeid) 723 token, exp = self._create(typeid, *args, **kwds) 724 proxy = proxytype( 725 token, self._serializer, manager=self, 726 authkey=self._authkey, exposed=exp 727 ) 728 conn = self._Client(token.address, authkey=self._authkey) 729 dispatch(conn, None, 'decref', (token.id,)) 730 return proxy 731 temp.__name__ = typeid 732 setattr(cls, typeid, temp) 733 734# 735# Subclass of set which get cleared after a fork 736# 737 738class ProcessLocalSet(set): 739 def __init__(self): 740 util.register_after_fork(self, lambda obj: obj.clear()) 741 def __reduce__(self): 742 return type(self), () 743 744# 745# Definition of BaseProxy 746# 747 748class BaseProxy(object): 749 ''' 750 A base for proxies of shared objects 751 ''' 752 _address_to_local = {} 753 _mutex = util.ForkAwareThreadLock() 754 755 def __init__(self, token, serializer, manager=None, 756 authkey=None, exposed=None, incref=True, manager_owned=False): 757 with BaseProxy._mutex: 758 tls_idset = BaseProxy._address_to_local.get(token.address, None) 759 if tls_idset is None: 760 tls_idset = util.ForkAwareLocal(), ProcessLocalSet() 761 BaseProxy._address_to_local[token.address] = tls_idset 762 763 # self._tls is used to record the connection used by this 764 # thread to communicate with the manager at token.address 765 self._tls = tls_idset[0] 766 767 # self._idset is used to record the identities of all shared 768 # objects for which the current process owns references and 769 # which are in the manager at token.address 770 self._idset = tls_idset[1] 771 772 self._token = token 773 self._id = self._token.id 774 self._manager = manager 775 self._serializer = serializer 776 self._Client = listener_client[serializer][1] 777 778 # Should be set to True only when a proxy object is being created 779 # on the manager server; primary use case: nested proxy objects. 780 # RebuildProxy detects when a proxy is being created on the manager 781 # and sets this value appropriately. 782 self._owned_by_manager = manager_owned 783 784 if authkey is not None: 785 self._authkey = process.AuthenticationString(authkey) 786 elif self._manager is not None: 787 self._authkey = self._manager._authkey 788 else: 789 self._authkey = process.current_process().authkey 790 791 if incref: 792 self._incref() 793 794 util.register_after_fork(self, BaseProxy._after_fork) 795 796 def _connect(self): 797 util.debug('making connection to manager') 798 name = process.current_process().name 799 if threading.current_thread().name != 'MainThread': 800 name += '|' + threading.current_thread().name 801 conn = self._Client(self._token.address, authkey=self._authkey) 802 dispatch(conn, None, 'accept_connection', (name,)) 803 self._tls.connection = conn 804 805 def _callmethod(self, methodname, args=(), kwds={}): 806 ''' 807 Try to call a method of the referent and return a copy of the result 808 ''' 809 try: 810 conn = self._tls.connection 811 except AttributeError: 812 util.debug('thread %r does not own a connection', 813 threading.current_thread().name) 814 self._connect() 815 conn = self._tls.connection 816 817 conn.send((self._id, methodname, args, kwds)) 818 kind, result = conn.recv() 819 820 if kind == '#RETURN': 821 return result 822 elif kind == '#PROXY': 823 exposed, token = result 824 proxytype = self._manager._registry[token.typeid][-1] 825 token.address = self._token.address 826 proxy = proxytype( 827 token, self._serializer, manager=self._manager, 828 authkey=self._authkey, exposed=exposed 829 ) 830 conn = self._Client(token.address, authkey=self._authkey) 831 dispatch(conn, None, 'decref', (token.id,)) 832 return proxy 833 raise convert_to_error(kind, result) 834 835 def _getvalue(self): 836 ''' 837 Get a copy of the value of the referent 838 ''' 839 return self._callmethod('#GETVALUE') 840 841 def _incref(self): 842 if self._owned_by_manager: 843 util.debug('owned_by_manager skipped INCREF of %r', self._token.id) 844 return 845 846 conn = self._Client(self._token.address, authkey=self._authkey) 847 dispatch(conn, None, 'incref', (self._id,)) 848 util.debug('INCREF %r', self._token.id) 849 850 self._idset.add(self._id) 851 852 state = self._manager and self._manager._state 853 854 self._close = util.Finalize( 855 self, BaseProxy._decref, 856 args=(self._token, self._authkey, state, 857 self._tls, self._idset, self._Client), 858 exitpriority=10 859 ) 860 861 @staticmethod 862 def _decref(token, authkey, state, tls, idset, _Client): 863 idset.discard(token.id) 864 865 # check whether manager is still alive 866 if state is None or state.value == State.STARTED: 867 # tell manager this process no longer cares about referent 868 try: 869 util.debug('DECREF %r', token.id) 870 conn = _Client(token.address, authkey=authkey) 871 dispatch(conn, None, 'decref', (token.id,)) 872 except Exception as e: 873 util.debug('... decref failed %s', e) 874 875 else: 876 util.debug('DECREF %r -- manager already shutdown', token.id) 877 878 # check whether we can close this thread's connection because 879 # the process owns no more references to objects for this manager 880 if not idset and hasattr(tls, 'connection'): 881 util.debug('thread %r has no more proxies so closing conn', 882 threading.current_thread().name) 883 tls.connection.close() 884 del tls.connection 885 886 def _after_fork(self): 887 self._manager = None 888 try: 889 self._incref() 890 except Exception as e: 891 # the proxy may just be for a manager which has shutdown 892 util.info('incref failed: %s' % e) 893 894 def __reduce__(self): 895 kwds = {} 896 if get_spawning_popen() is not None: 897 kwds['authkey'] = self._authkey 898 899 if getattr(self, '_isauto', False): 900 kwds['exposed'] = self._exposed_ 901 return (RebuildProxy, 902 (AutoProxy, self._token, self._serializer, kwds)) 903 else: 904 return (RebuildProxy, 905 (type(self), self._token, self._serializer, kwds)) 906 907 def __deepcopy__(self, memo): 908 return self._getvalue() 909 910 def __repr__(self): 911 return '<%s object, typeid %r at %#x>' % \ 912 (type(self).__name__, self._token.typeid, id(self)) 913 914 def __str__(self): 915 ''' 916 Return representation of the referent (or a fall-back if that fails) 917 ''' 918 try: 919 return self._callmethod('__repr__') 920 except Exception: 921 return repr(self)[:-1] + "; '__str__()' failed>" 922 923# 924# Function used for unpickling 925# 926 927def RebuildProxy(func, token, serializer, kwds): 928 ''' 929 Function used for unpickling proxy objects. 930 ''' 931 server = getattr(process.current_process(), '_manager_server', None) 932 if server and server.address == token.address: 933 util.debug('Rebuild a proxy owned by manager, token=%r', token) 934 kwds['manager_owned'] = True 935 if token.id not in server.id_to_local_proxy_obj: 936 server.id_to_local_proxy_obj[token.id] = \ 937 server.id_to_obj[token.id] 938 incref = ( 939 kwds.pop('incref', True) and 940 not getattr(process.current_process(), '_inheriting', False) 941 ) 942 return func(token, serializer, incref=incref, **kwds) 943 944# 945# Functions to create proxies and proxy types 946# 947 948def MakeProxyType(name, exposed, _cache={}): 949 ''' 950 Return a proxy type whose methods are given by `exposed` 951 ''' 952 exposed = tuple(exposed) 953 try: 954 return _cache[(name, exposed)] 955 except KeyError: 956 pass 957 958 dic = {} 959 960 for meth in exposed: 961 exec('''def %s(self, /, *args, **kwds): 962 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) 963 964 ProxyType = type(name, (BaseProxy,), dic) 965 ProxyType._exposed_ = exposed 966 _cache[(name, exposed)] = ProxyType 967 return ProxyType 968 969 970def AutoProxy(token, serializer, manager=None, authkey=None, 971 exposed=None, incref=True, manager_owned=False): 972 ''' 973 Return an auto-proxy for `token` 974 ''' 975 _Client = listener_client[serializer][1] 976 977 if exposed is None: 978 conn = _Client(token.address, authkey=authkey) 979 try: 980 exposed = dispatch(conn, None, 'get_methods', (token,)) 981 finally: 982 conn.close() 983 984 if authkey is None and manager is not None: 985 authkey = manager._authkey 986 if authkey is None: 987 authkey = process.current_process().authkey 988 989 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) 990 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, 991 incref=incref, manager_owned=manager_owned) 992 proxy._isauto = True 993 return proxy 994 995# 996# Types/callables which we will register with SyncManager 997# 998 999class Namespace(object): 1000 def __init__(self, /, **kwds): 1001 self.__dict__.update(kwds) 1002 def __repr__(self): 1003 items = list(self.__dict__.items()) 1004 temp = [] 1005 for name, value in items: 1006 if not name.startswith('_'): 1007 temp.append('%s=%r' % (name, value)) 1008 temp.sort() 1009 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp)) 1010 1011class Value(object): 1012 def __init__(self, typecode, value, lock=True): 1013 self._typecode = typecode 1014 self._value = value 1015 def get(self): 1016 return self._value 1017 def set(self, value): 1018 self._value = value 1019 def __repr__(self): 1020 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) 1021 value = property(get, set) 1022 1023def Array(typecode, sequence, lock=True): 1024 return array.array(typecode, sequence) 1025 1026# 1027# Proxy types used by SyncManager 1028# 1029 1030class IteratorProxy(BaseProxy): 1031 _exposed_ = ('__next__', 'send', 'throw', 'close') 1032 def __iter__(self): 1033 return self 1034 def __next__(self, *args): 1035 return self._callmethod('__next__', args) 1036 def send(self, *args): 1037 return self._callmethod('send', args) 1038 def throw(self, *args): 1039 return self._callmethod('throw', args) 1040 def close(self, *args): 1041 return self._callmethod('close', args) 1042 1043 1044class AcquirerProxy(BaseProxy): 1045 _exposed_ = ('acquire', 'release') 1046 def acquire(self, blocking=True, timeout=None): 1047 args = (blocking,) if timeout is None else (blocking, timeout) 1048 return self._callmethod('acquire', args) 1049 def release(self): 1050 return self._callmethod('release') 1051 def __enter__(self): 1052 return self._callmethod('acquire') 1053 def __exit__(self, exc_type, exc_val, exc_tb): 1054 return self._callmethod('release') 1055 1056 1057class ConditionProxy(AcquirerProxy): 1058 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') 1059 def wait(self, timeout=None): 1060 return self._callmethod('wait', (timeout,)) 1061 def notify(self, n=1): 1062 return self._callmethod('notify', (n,)) 1063 def notify_all(self): 1064 return self._callmethod('notify_all') 1065 def wait_for(self, predicate, timeout=None): 1066 result = predicate() 1067 if result: 1068 return result 1069 if timeout is not None: 1070 endtime = time.monotonic() + timeout 1071 else: 1072 endtime = None 1073 waittime = None 1074 while not result: 1075 if endtime is not None: 1076 waittime = endtime - time.monotonic() 1077 if waittime <= 0: 1078 break 1079 self.wait(waittime) 1080 result = predicate() 1081 return result 1082 1083 1084class EventProxy(BaseProxy): 1085 _exposed_ = ('is_set', 'set', 'clear', 'wait') 1086 def is_set(self): 1087 return self._callmethod('is_set') 1088 def set(self): 1089 return self._callmethod('set') 1090 def clear(self): 1091 return self._callmethod('clear') 1092 def wait(self, timeout=None): 1093 return self._callmethod('wait', (timeout,)) 1094 1095 1096class BarrierProxy(BaseProxy): 1097 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset') 1098 def wait(self, timeout=None): 1099 return self._callmethod('wait', (timeout,)) 1100 def abort(self): 1101 return self._callmethod('abort') 1102 def reset(self): 1103 return self._callmethod('reset') 1104 @property 1105 def parties(self): 1106 return self._callmethod('__getattribute__', ('parties',)) 1107 @property 1108 def n_waiting(self): 1109 return self._callmethod('__getattribute__', ('n_waiting',)) 1110 @property 1111 def broken(self): 1112 return self._callmethod('__getattribute__', ('broken',)) 1113 1114 1115class NamespaceProxy(BaseProxy): 1116 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') 1117 def __getattr__(self, key): 1118 if key[0] == '_': 1119 return object.__getattribute__(self, key) 1120 callmethod = object.__getattribute__(self, '_callmethod') 1121 return callmethod('__getattribute__', (key,)) 1122 def __setattr__(self, key, value): 1123 if key[0] == '_': 1124 return object.__setattr__(self, key, value) 1125 callmethod = object.__getattribute__(self, '_callmethod') 1126 return callmethod('__setattr__', (key, value)) 1127 def __delattr__(self, key): 1128 if key[0] == '_': 1129 return object.__delattr__(self, key) 1130 callmethod = object.__getattribute__(self, '_callmethod') 1131 return callmethod('__delattr__', (key,)) 1132 1133 1134class ValueProxy(BaseProxy): 1135 _exposed_ = ('get', 'set') 1136 def get(self): 1137 return self._callmethod('get') 1138 def set(self, value): 1139 return self._callmethod('set', (value,)) 1140 value = property(get, set) 1141 1142 __class_getitem__ = classmethod(types.GenericAlias) 1143 1144 1145BaseListProxy = MakeProxyType('BaseListProxy', ( 1146 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', 1147 '__mul__', '__reversed__', '__rmul__', '__setitem__', 1148 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 1149 'reverse', 'sort', '__imul__' 1150 )) 1151class ListProxy(BaseListProxy): 1152 def __iadd__(self, value): 1153 self._callmethod('extend', (value,)) 1154 return self 1155 def __imul__(self, value): 1156 self._callmethod('__imul__', (value,)) 1157 return self 1158 1159 1160DictProxy = MakeProxyType('DictProxy', ( 1161 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__', 1162 '__setitem__', 'clear', 'copy', 'get', 'items', 1163 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' 1164 )) 1165DictProxy._method_to_typeid_ = { 1166 '__iter__': 'Iterator', 1167 } 1168 1169 1170ArrayProxy = MakeProxyType('ArrayProxy', ( 1171 '__len__', '__getitem__', '__setitem__' 1172 )) 1173 1174 1175BasePoolProxy = MakeProxyType('PoolProxy', ( 1176 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 1177 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', 1178 )) 1179BasePoolProxy._method_to_typeid_ = { 1180 'apply_async': 'AsyncResult', 1181 'map_async': 'AsyncResult', 1182 'starmap_async': 'AsyncResult', 1183 'imap': 'Iterator', 1184 'imap_unordered': 'Iterator' 1185 } 1186class PoolProxy(BasePoolProxy): 1187 def __enter__(self): 1188 return self 1189 def __exit__(self, exc_type, exc_val, exc_tb): 1190 self.terminate() 1191 1192# 1193# Definition of SyncManager 1194# 1195 1196class SyncManager(BaseManager): 1197 ''' 1198 Subclass of `BaseManager` which supports a number of shared object types. 1199 1200 The types registered are those intended for the synchronization 1201 of threads, plus `dict`, `list` and `Namespace`. 1202 1203 The `multiprocessing.Manager()` function creates started instances of 1204 this class. 1205 ''' 1206 1207SyncManager.register('Queue', queue.Queue) 1208SyncManager.register('JoinableQueue', queue.Queue) 1209SyncManager.register('Event', threading.Event, EventProxy) 1210SyncManager.register('Lock', threading.Lock, AcquirerProxy) 1211SyncManager.register('RLock', threading.RLock, AcquirerProxy) 1212SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) 1213SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, 1214 AcquirerProxy) 1215SyncManager.register('Condition', threading.Condition, ConditionProxy) 1216SyncManager.register('Barrier', threading.Barrier, BarrierProxy) 1217SyncManager.register('Pool', pool.Pool, PoolProxy) 1218SyncManager.register('list', list, ListProxy) 1219SyncManager.register('dict', dict, DictProxy) 1220SyncManager.register('Value', Value, ValueProxy) 1221SyncManager.register('Array', Array, ArrayProxy) 1222SyncManager.register('Namespace', Namespace, NamespaceProxy) 1223 1224# types returned by methods of PoolProxy 1225SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) 1226SyncManager.register('AsyncResult', create_method=False) 1227 1228# 1229# Definition of SharedMemoryManager and SharedMemoryServer 1230# 1231 1232if HAS_SHMEM: 1233 class _SharedMemoryTracker: 1234 "Manages one or more shared memory segments." 1235 1236 def __init__(self, name, segment_names=[]): 1237 self.shared_memory_context_name = name 1238 self.segment_names = segment_names 1239 1240 def register_segment(self, segment_name): 1241 "Adds the supplied shared memory block name to tracker." 1242 util.debug(f"Register segment {segment_name!r} in pid {getpid()}") 1243 self.segment_names.append(segment_name) 1244 1245 def destroy_segment(self, segment_name): 1246 """Calls unlink() on the shared memory block with the supplied name 1247 and removes it from the list of blocks being tracked.""" 1248 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}") 1249 self.segment_names.remove(segment_name) 1250 segment = shared_memory.SharedMemory(segment_name) 1251 segment.close() 1252 segment.unlink() 1253 1254 def unlink(self): 1255 "Calls destroy_segment() on all tracked shared memory blocks." 1256 for segment_name in self.segment_names[:]: 1257 self.destroy_segment(segment_name) 1258 1259 def __del__(self): 1260 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}") 1261 self.unlink() 1262 1263 def __getstate__(self): 1264 return (self.shared_memory_context_name, self.segment_names) 1265 1266 def __setstate__(self, state): 1267 self.__init__(*state) 1268 1269 1270 class SharedMemoryServer(Server): 1271 1272 public = Server.public + \ 1273 ['track_segment', 'release_segment', 'list_segments'] 1274 1275 def __init__(self, *args, **kwargs): 1276 Server.__init__(self, *args, **kwargs) 1277 address = self.address 1278 # The address of Linux abstract namespaces can be bytes 1279 if isinstance(address, bytes): 1280 address = os.fsdecode(address) 1281 self.shared_memory_context = \ 1282 _SharedMemoryTracker(f"shm_{address}_{getpid()}") 1283 util.debug(f"SharedMemoryServer started by pid {getpid()}") 1284 1285 def create(self, c, typeid, /, *args, **kwargs): 1286 """Create a new distributed-shared object (not backed by a shared 1287 memory block) and return its id to be used in a Proxy Object.""" 1288 # Unless set up as a shared proxy, don't make shared_memory_context 1289 # a standard part of kwargs. This makes things easier for supplying 1290 # simple functions. 1291 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"): 1292 kwargs['shared_memory_context'] = self.shared_memory_context 1293 return Server.create(self, c, typeid, *args, **kwargs) 1294 1295 def shutdown(self, c): 1296 "Call unlink() on all tracked shared memory, terminate the Server." 1297 self.shared_memory_context.unlink() 1298 return Server.shutdown(self, c) 1299 1300 def track_segment(self, c, segment_name): 1301 "Adds the supplied shared memory block name to Server's tracker." 1302 self.shared_memory_context.register_segment(segment_name) 1303 1304 def release_segment(self, c, segment_name): 1305 """Calls unlink() on the shared memory block with the supplied name 1306 and removes it from the tracker instance inside the Server.""" 1307 self.shared_memory_context.destroy_segment(segment_name) 1308 1309 def list_segments(self, c): 1310 """Returns a list of names of shared memory blocks that the Server 1311 is currently tracking.""" 1312 return self.shared_memory_context.segment_names 1313 1314 1315 class SharedMemoryManager(BaseManager): 1316 """Like SyncManager but uses SharedMemoryServer instead of Server. 1317 1318 It provides methods for creating and returning SharedMemory instances 1319 and for creating a list-like object (ShareableList) backed by shared 1320 memory. It also provides methods that create and return Proxy Objects 1321 that support synchronization across processes (i.e. multi-process-safe 1322 locks and semaphores). 1323 """ 1324 1325 _Server = SharedMemoryServer 1326 1327 def __init__(self, *args, **kwargs): 1328 if os.name == "posix": 1329 # bpo-36867: Ensure the resource_tracker is running before 1330 # launching the manager process, so that concurrent 1331 # shared_memory manipulation both in the manager and in the 1332 # current process does not create two resource_tracker 1333 # processes. 1334 from . import resource_tracker 1335 resource_tracker.ensure_running() 1336 BaseManager.__init__(self, *args, **kwargs) 1337 util.debug(f"{self.__class__.__name__} created by pid {getpid()}") 1338 1339 def __del__(self): 1340 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}") 1341 pass 1342 1343 def get_server(self): 1344 'Better than monkeypatching for now; merge into Server ultimately' 1345 if self._state.value != State.INITIAL: 1346 if self._state.value == State.STARTED: 1347 raise ProcessError("Already started SharedMemoryServer") 1348 elif self._state.value == State.SHUTDOWN: 1349 raise ProcessError("SharedMemoryManager has shut down") 1350 else: 1351 raise ProcessError( 1352 "Unknown state {!r}".format(self._state.value)) 1353 return self._Server(self._registry, self._address, 1354 self._authkey, self._serializer) 1355 1356 def SharedMemory(self, size): 1357 """Returns a new SharedMemory instance with the specified size in 1358 bytes, to be tracked by the manager.""" 1359 with self._Client(self._address, authkey=self._authkey) as conn: 1360 sms = shared_memory.SharedMemory(None, create=True, size=size) 1361 try: 1362 dispatch(conn, None, 'track_segment', (sms.name,)) 1363 except BaseException as e: 1364 sms.unlink() 1365 raise e 1366 return sms 1367 1368 def ShareableList(self, sequence): 1369 """Returns a new ShareableList instance populated with the values 1370 from the input sequence, to be tracked by the manager.""" 1371 with self._Client(self._address, authkey=self._authkey) as conn: 1372 sl = shared_memory.ShareableList(sequence) 1373 try: 1374 dispatch(conn, None, 'track_segment', (sl.shm.name,)) 1375 except BaseException as e: 1376 sl.shm.unlink() 1377 raise e 1378 return sl 1379