1# 2# Module providing manager classes for dealing 3# with shared objects 4# 5# multiprocessing/managers.py 6# 7# Copyright (c) 2006-2008, R Oudkerk 8# Licensed to PSF under a Contributor Agreement. 9# 10 11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token', 12 'SharedMemoryManager' ] 13 14# 15# Imports 16# 17 18import sys 19import threading 20import signal 21import array 22import queue 23import time 24import os 25from os import getpid 26 27from traceback import format_exc 28 29from . import connection 30from .context import reduction, get_spawning_popen, ProcessError 31from . import pool 32from . import process 33from . import util 34from . import get_context 35try: 36 from . import shared_memory 37 HAS_SHMEM = True 38except ImportError: 39 HAS_SHMEM = False 40 41# 42# Register some things for pickling 43# 44 45def reduce_array(a): 46 return array.array, (a.typecode, a.tobytes()) 47reduction.register(array.array, reduce_array) 48 49view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] 50if view_types[0] is not list: # only needed in Py3.0 51 def rebuild_as_list(obj): 52 return list, (list(obj),) 53 for view_type in view_types: 54 reduction.register(view_type, rebuild_as_list) 55 56# 57# Type for identifying shared objects 58# 59 60class Token(object): 61 ''' 62 Type to uniquely indentify a shared object 63 ''' 64 __slots__ = ('typeid', 'address', 'id') 65 66 def __init__(self, typeid, address, id): 67 (self.typeid, self.address, self.id) = (typeid, address, id) 68 69 def __getstate__(self): 70 return (self.typeid, self.address, self.id) 71 72 def __setstate__(self, state): 73 (self.typeid, self.address, self.id) = state 74 75 def __repr__(self): 76 return '%s(typeid=%r, address=%r, id=%r)' % \ 77 (self.__class__.__name__, self.typeid, self.address, self.id) 78 79# 80# Function for communication with a manager's server process 81# 82 83def dispatch(c, id, methodname, args=(), kwds={}): 84 ''' 85 Send a message to manager using connection `c` and return response 86 ''' 87 c.send((id, methodname, args, kwds)) 88 kind, result = c.recv() 89 if kind == '#RETURN': 90 return result 91 raise convert_to_error(kind, result) 92 93def convert_to_error(kind, result): 94 if kind == '#ERROR': 95 return result 96 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'): 97 if not isinstance(result, str): 98 raise TypeError( 99 "Result {0!r} (kind '{1}') type is {2}, not str".format( 100 result, kind, type(result))) 101 if kind == '#UNSERIALIZABLE': 102 return RemoteError('Unserializable message: %s\n' % result) 103 else: 104 return RemoteError(result) 105 else: 106 return ValueError('Unrecognized message type {!r}'.format(kind)) 107 108class RemoteError(Exception): 109 def __str__(self): 110 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) 111 112# 113# Functions for finding the method names of an object 114# 115 116def all_methods(obj): 117 ''' 118 Return a list of names of methods of `obj` 119 ''' 120 temp = [] 121 for name in dir(obj): 122 func = getattr(obj, name) 123 if callable(func): 124 temp.append(name) 125 return temp 126 127def public_methods(obj): 128 ''' 129 Return a list of names of methods of `obj` which do not start with '_' 130 ''' 131 return [name for name in all_methods(obj) if name[0] != '_'] 132 133# 134# Server which is run in a process controlled by a manager 135# 136 137class Server(object): 138 ''' 139 Server class which runs in a process controlled by a manager object 140 ''' 141 public = ['shutdown', 'create', 'accept_connection', 'get_methods', 142 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] 143 144 def __init__(self, registry, address, authkey, serializer): 145 if not isinstance(authkey, bytes): 146 raise TypeError( 147 "Authkey {0!r} is type {1!s}, not bytes".format( 148 authkey, type(authkey))) 149 self.registry = registry 150 self.authkey = process.AuthenticationString(authkey) 151 Listener, Client = listener_client[serializer] 152 153 # do authentication later 154 self.listener = Listener(address=address, backlog=16) 155 self.address = self.listener.address 156 157 self.id_to_obj = {'0': (None, ())} 158 self.id_to_refcount = {} 159 self.id_to_local_proxy_obj = {} 160 self.mutex = threading.Lock() 161 162 def serve_forever(self): 163 ''' 164 Run the server forever 165 ''' 166 self.stop_event = threading.Event() 167 process.current_process()._manager_server = self 168 try: 169 accepter = threading.Thread(target=self.accepter) 170 accepter.daemon = True 171 accepter.start() 172 try: 173 while not self.stop_event.is_set(): 174 self.stop_event.wait(1) 175 except (KeyboardInterrupt, SystemExit): 176 pass 177 finally: 178 if sys.stdout != sys.__stdout__: # what about stderr? 179 util.debug('resetting stdout, stderr') 180 sys.stdout = sys.__stdout__ 181 sys.stderr = sys.__stderr__ 182 sys.exit(0) 183 184 def accepter(self): 185 while True: 186 try: 187 c = self.listener.accept() 188 except OSError: 189 continue 190 t = threading.Thread(target=self.handle_request, args=(c,)) 191 t.daemon = True 192 t.start() 193 194 def handle_request(self, c): 195 ''' 196 Handle a new connection 197 ''' 198 funcname = result = request = None 199 try: 200 connection.deliver_challenge(c, self.authkey) 201 connection.answer_challenge(c, self.authkey) 202 request = c.recv() 203 ignore, funcname, args, kwds = request 204 assert funcname in self.public, '%r unrecognized' % funcname 205 func = getattr(self, funcname) 206 except Exception: 207 msg = ('#TRACEBACK', format_exc()) 208 else: 209 try: 210 result = func(c, *args, **kwds) 211 except Exception: 212 msg = ('#TRACEBACK', format_exc()) 213 else: 214 msg = ('#RETURN', result) 215 try: 216 c.send(msg) 217 except Exception as e: 218 try: 219 c.send(('#TRACEBACK', format_exc())) 220 except Exception: 221 pass 222 util.info('Failure to send message: %r', msg) 223 util.info(' ... request was %r', request) 224 util.info(' ... exception was %r', e) 225 226 c.close() 227 228 def serve_client(self, conn): 229 ''' 230 Handle requests from the proxies in a particular process/thread 231 ''' 232 util.debug('starting server thread to service %r', 233 threading.current_thread().name) 234 235 recv = conn.recv 236 send = conn.send 237 id_to_obj = self.id_to_obj 238 239 while not self.stop_event.is_set(): 240 241 try: 242 methodname = obj = None 243 request = recv() 244 ident, methodname, args, kwds = request 245 try: 246 obj, exposed, gettypeid = id_to_obj[ident] 247 except KeyError as ke: 248 try: 249 obj, exposed, gettypeid = \ 250 self.id_to_local_proxy_obj[ident] 251 except KeyError as second_ke: 252 raise ke 253 254 if methodname not in exposed: 255 raise AttributeError( 256 'method %r of %r object is not in exposed=%r' % 257 (methodname, type(obj), exposed) 258 ) 259 260 function = getattr(obj, methodname) 261 262 try: 263 res = function(*args, **kwds) 264 except Exception as e: 265 msg = ('#ERROR', e) 266 else: 267 typeid = gettypeid and gettypeid.get(methodname, None) 268 if typeid: 269 rident, rexposed = self.create(conn, typeid, res) 270 token = Token(typeid, self.address, rident) 271 msg = ('#PROXY', (rexposed, token)) 272 else: 273 msg = ('#RETURN', res) 274 275 except AttributeError: 276 if methodname is None: 277 msg = ('#TRACEBACK', format_exc()) 278 else: 279 try: 280 fallback_func = self.fallback_mapping[methodname] 281 result = fallback_func( 282 self, conn, ident, obj, *args, **kwds 283 ) 284 msg = ('#RETURN', result) 285 except Exception: 286 msg = ('#TRACEBACK', format_exc()) 287 288 except EOFError: 289 util.debug('got EOF -- exiting thread serving %r', 290 threading.current_thread().name) 291 sys.exit(0) 292 293 except Exception: 294 msg = ('#TRACEBACK', format_exc()) 295 296 try: 297 try: 298 send(msg) 299 except Exception as e: 300 send(('#UNSERIALIZABLE', format_exc())) 301 except Exception as e: 302 util.info('exception in thread serving %r', 303 threading.current_thread().name) 304 util.info(' ... message was %r', msg) 305 util.info(' ... exception was %r', e) 306 conn.close() 307 sys.exit(1) 308 309 def fallback_getvalue(self, conn, ident, obj): 310 return obj 311 312 def fallback_str(self, conn, ident, obj): 313 return str(obj) 314 315 def fallback_repr(self, conn, ident, obj): 316 return repr(obj) 317 318 fallback_mapping = { 319 '__str__':fallback_str, 320 '__repr__':fallback_repr, 321 '#GETVALUE':fallback_getvalue 322 } 323 324 def dummy(self, c): 325 pass 326 327 def debug_info(self, c): 328 ''' 329 Return some info --- useful to spot problems with refcounting 330 ''' 331 # Perhaps include debug info about 'c'? 332 with self.mutex: 333 result = [] 334 keys = list(self.id_to_refcount.keys()) 335 keys.sort() 336 for ident in keys: 337 if ident != '0': 338 result.append(' %s: refcount=%s\n %s' % 339 (ident, self.id_to_refcount[ident], 340 str(self.id_to_obj[ident][0])[:75])) 341 return '\n'.join(result) 342 343 def number_of_objects(self, c): 344 ''' 345 Number of shared objects 346 ''' 347 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0' 348 return len(self.id_to_refcount) 349 350 def shutdown(self, c): 351 ''' 352 Shutdown this process 353 ''' 354 try: 355 util.debug('manager received shutdown message') 356 c.send(('#RETURN', None)) 357 except: 358 import traceback 359 traceback.print_exc() 360 finally: 361 self.stop_event.set() 362 363 def create(*args, **kwds): 364 ''' 365 Create a new shared object and return its id 366 ''' 367 if len(args) >= 3: 368 self, c, typeid, *args = args 369 elif not args: 370 raise TypeError("descriptor 'create' of 'Server' object " 371 "needs an argument") 372 else: 373 if 'typeid' not in kwds: 374 raise TypeError('create expected at least 2 positional ' 375 'arguments, got %d' % (len(args)-1)) 376 typeid = kwds.pop('typeid') 377 if len(args) >= 2: 378 self, c, *args = args 379 import warnings 380 warnings.warn("Passing 'typeid' as keyword argument is deprecated", 381 DeprecationWarning, stacklevel=2) 382 else: 383 if 'c' not in kwds: 384 raise TypeError('create expected at least 2 positional ' 385 'arguments, got %d' % (len(args)-1)) 386 c = kwds.pop('c') 387 self, *args = args 388 import warnings 389 warnings.warn("Passing 'c' as keyword argument is deprecated", 390 DeprecationWarning, stacklevel=2) 391 args = tuple(args) 392 393 with self.mutex: 394 callable, exposed, method_to_typeid, proxytype = \ 395 self.registry[typeid] 396 397 if callable is None: 398 if kwds or (len(args) != 1): 399 raise ValueError( 400 "Without callable, must have one non-keyword argument") 401 obj = args[0] 402 else: 403 obj = callable(*args, **kwds) 404 405 if exposed is None: 406 exposed = public_methods(obj) 407 if method_to_typeid is not None: 408 if not isinstance(method_to_typeid, dict): 409 raise TypeError( 410 "Method_to_typeid {0!r}: type {1!s}, not dict".format( 411 method_to_typeid, type(method_to_typeid))) 412 exposed = list(exposed) + list(method_to_typeid) 413 414 ident = '%x' % id(obj) # convert to string because xmlrpclib 415 # only has 32 bit signed integers 416 util.debug('%r callable returned object with id %r', typeid, ident) 417 418 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) 419 if ident not in self.id_to_refcount: 420 self.id_to_refcount[ident] = 0 421 422 self.incref(c, ident) 423 return ident, tuple(exposed) 424 create.__text_signature__ = '($self, c, typeid, /, *args, **kwds)' 425 426 def get_methods(self, c, token): 427 ''' 428 Return the methods of the shared object indicated by token 429 ''' 430 return tuple(self.id_to_obj[token.id][1]) 431 432 def accept_connection(self, c, name): 433 ''' 434 Spawn a new thread to serve this connection 435 ''' 436 threading.current_thread().name = name 437 c.send(('#RETURN', None)) 438 self.serve_client(c) 439 440 def incref(self, c, ident): 441 with self.mutex: 442 try: 443 self.id_to_refcount[ident] += 1 444 except KeyError as ke: 445 # If no external references exist but an internal (to the 446 # manager) still does and a new external reference is created 447 # from it, restore the manager's tracking of it from the 448 # previously stashed internal ref. 449 if ident in self.id_to_local_proxy_obj: 450 self.id_to_refcount[ident] = 1 451 self.id_to_obj[ident] = \ 452 self.id_to_local_proxy_obj[ident] 453 obj, exposed, gettypeid = self.id_to_obj[ident] 454 util.debug('Server re-enabled tracking & INCREF %r', ident) 455 else: 456 raise ke 457 458 def decref(self, c, ident): 459 if ident not in self.id_to_refcount and \ 460 ident in self.id_to_local_proxy_obj: 461 util.debug('Server DECREF skipping %r', ident) 462 return 463 464 with self.mutex: 465 if self.id_to_refcount[ident] <= 0: 466 raise AssertionError( 467 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format( 468 ident, self.id_to_obj[ident], 469 self.id_to_refcount[ident])) 470 self.id_to_refcount[ident] -= 1 471 if self.id_to_refcount[ident] == 0: 472 del self.id_to_refcount[ident] 473 474 if ident not in self.id_to_refcount: 475 # Two-step process in case the object turns out to contain other 476 # proxy objects (e.g. a managed list of managed lists). 477 # Otherwise, deleting self.id_to_obj[ident] would trigger the 478 # deleting of the stored value (another managed object) which would 479 # in turn attempt to acquire the mutex that is already held here. 480 self.id_to_obj[ident] = (None, (), None) # thread-safe 481 util.debug('disposing of obj with id %r', ident) 482 with self.mutex: 483 del self.id_to_obj[ident] 484 485 486# 487# Class to represent state of a manager 488# 489 490class State(object): 491 __slots__ = ['value'] 492 INITIAL = 0 493 STARTED = 1 494 SHUTDOWN = 2 495 496# 497# Mapping from serializer name to Listener and Client types 498# 499 500listener_client = { 501 'pickle' : (connection.Listener, connection.Client), 502 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) 503 } 504 505# 506# Definition of BaseManager 507# 508 509class BaseManager(object): 510 ''' 511 Base class for managers 512 ''' 513 _registry = {} 514 _Server = Server 515 516 def __init__(self, address=None, authkey=None, serializer='pickle', 517 ctx=None): 518 if authkey is None: 519 authkey = process.current_process().authkey 520 self._address = address # XXX not final address if eg ('', 0) 521 self._authkey = process.AuthenticationString(authkey) 522 self._state = State() 523 self._state.value = State.INITIAL 524 self._serializer = serializer 525 self._Listener, self._Client = listener_client[serializer] 526 self._ctx = ctx or get_context() 527 528 def get_server(self): 529 ''' 530 Return server object with serve_forever() method and address attribute 531 ''' 532 if self._state.value != State.INITIAL: 533 if self._state.value == State.STARTED: 534 raise ProcessError("Already started server") 535 elif self._state.value == State.SHUTDOWN: 536 raise ProcessError("Manager has shut down") 537 else: 538 raise ProcessError( 539 "Unknown state {!r}".format(self._state.value)) 540 return Server(self._registry, self._address, 541 self._authkey, self._serializer) 542 543 def connect(self): 544 ''' 545 Connect manager object to the server process 546 ''' 547 Listener, Client = listener_client[self._serializer] 548 conn = Client(self._address, authkey=self._authkey) 549 dispatch(conn, None, 'dummy') 550 self._state.value = State.STARTED 551 552 def start(self, initializer=None, initargs=()): 553 ''' 554 Spawn a server process for this manager object 555 ''' 556 if self._state.value != State.INITIAL: 557 if self._state.value == State.STARTED: 558 raise ProcessError("Already started server") 559 elif self._state.value == State.SHUTDOWN: 560 raise ProcessError("Manager has shut down") 561 else: 562 raise ProcessError( 563 "Unknown state {!r}".format(self._state.value)) 564 565 if initializer is not None and not callable(initializer): 566 raise TypeError('initializer must be a callable') 567 568 # pipe over which we will retrieve address of server 569 reader, writer = connection.Pipe(duplex=False) 570 571 # spawn process which runs a server 572 self._process = self._ctx.Process( 573 target=type(self)._run_server, 574 args=(self._registry, self._address, self._authkey, 575 self._serializer, writer, initializer, initargs), 576 ) 577 ident = ':'.join(str(i) for i in self._process._identity) 578 self._process.name = type(self).__name__ + '-' + ident 579 self._process.start() 580 581 # get address of server 582 writer.close() 583 self._address = reader.recv() 584 reader.close() 585 586 # register a finalizer 587 self._state.value = State.STARTED 588 self.shutdown = util.Finalize( 589 self, type(self)._finalize_manager, 590 args=(self._process, self._address, self._authkey, 591 self._state, self._Client), 592 exitpriority=0 593 ) 594 595 @classmethod 596 def _run_server(cls, registry, address, authkey, serializer, writer, 597 initializer=None, initargs=()): 598 ''' 599 Create a server, report its address and run it 600 ''' 601 # bpo-36368: protect server process from KeyboardInterrupt signals 602 signal.signal(signal.SIGINT, signal.SIG_IGN) 603 604 if initializer is not None: 605 initializer(*initargs) 606 607 # create server 608 server = cls._Server(registry, address, authkey, serializer) 609 610 # inform parent process of the server's address 611 writer.send(server.address) 612 writer.close() 613 614 # run the manager 615 util.info('manager serving at %r', server.address) 616 server.serve_forever() 617 618 def _create(self, typeid, /, *args, **kwds): 619 ''' 620 Create a new shared object; return the token and exposed tuple 621 ''' 622 assert self._state.value == State.STARTED, 'server not yet started' 623 conn = self._Client(self._address, authkey=self._authkey) 624 try: 625 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) 626 finally: 627 conn.close() 628 return Token(typeid, self._address, id), exposed 629 630 def join(self, timeout=None): 631 ''' 632 Join the manager process (if it has been spawned) 633 ''' 634 if self._process is not None: 635 self._process.join(timeout) 636 if not self._process.is_alive(): 637 self._process = None 638 639 def _debug_info(self): 640 ''' 641 Return some info about the servers shared objects and connections 642 ''' 643 conn = self._Client(self._address, authkey=self._authkey) 644 try: 645 return dispatch(conn, None, 'debug_info') 646 finally: 647 conn.close() 648 649 def _number_of_objects(self): 650 ''' 651 Return the number of shared objects 652 ''' 653 conn = self._Client(self._address, authkey=self._authkey) 654 try: 655 return dispatch(conn, None, 'number_of_objects') 656 finally: 657 conn.close() 658 659 def __enter__(self): 660 if self._state.value == State.INITIAL: 661 self.start() 662 if self._state.value != State.STARTED: 663 if self._state.value == State.INITIAL: 664 raise ProcessError("Unable to start server") 665 elif self._state.value == State.SHUTDOWN: 666 raise ProcessError("Manager has shut down") 667 else: 668 raise ProcessError( 669 "Unknown state {!r}".format(self._state.value)) 670 return self 671 672 def __exit__(self, exc_type, exc_val, exc_tb): 673 self.shutdown() 674 675 @staticmethod 676 def _finalize_manager(process, address, authkey, state, _Client): 677 ''' 678 Shutdown the manager process; will be registered as a finalizer 679 ''' 680 if process.is_alive(): 681 util.info('sending shutdown message to manager') 682 try: 683 conn = _Client(address, authkey=authkey) 684 try: 685 dispatch(conn, None, 'shutdown') 686 finally: 687 conn.close() 688 except Exception: 689 pass 690 691 process.join(timeout=1.0) 692 if process.is_alive(): 693 util.info('manager still alive') 694 if hasattr(process, 'terminate'): 695 util.info('trying to `terminate()` manager process') 696 process.terminate() 697 process.join(timeout=0.1) 698 if process.is_alive(): 699 util.info('manager still alive after terminate') 700 701 state.value = State.SHUTDOWN 702 try: 703 del BaseProxy._address_to_local[address] 704 except KeyError: 705 pass 706 707 @property 708 def address(self): 709 return self._address 710 711 @classmethod 712 def register(cls, typeid, callable=None, proxytype=None, exposed=None, 713 method_to_typeid=None, create_method=True): 714 ''' 715 Register a typeid with the manager type 716 ''' 717 if '_registry' not in cls.__dict__: 718 cls._registry = cls._registry.copy() 719 720 if proxytype is None: 721 proxytype = AutoProxy 722 723 exposed = exposed or getattr(proxytype, '_exposed_', None) 724 725 method_to_typeid = method_to_typeid or \ 726 getattr(proxytype, '_method_to_typeid_', None) 727 728 if method_to_typeid: 729 for key, value in list(method_to_typeid.items()): # isinstance? 730 assert type(key) is str, '%r is not a string' % key 731 assert type(value) is str, '%r is not a string' % value 732 733 cls._registry[typeid] = ( 734 callable, exposed, method_to_typeid, proxytype 735 ) 736 737 if create_method: 738 def temp(self, /, *args, **kwds): 739 util.debug('requesting creation of a shared %r object', typeid) 740 token, exp = self._create(typeid, *args, **kwds) 741 proxy = proxytype( 742 token, self._serializer, manager=self, 743 authkey=self._authkey, exposed=exp 744 ) 745 conn = self._Client(token.address, authkey=self._authkey) 746 dispatch(conn, None, 'decref', (token.id,)) 747 return proxy 748 temp.__name__ = typeid 749 setattr(cls, typeid, temp) 750 751# 752# Subclass of set which get cleared after a fork 753# 754 755class ProcessLocalSet(set): 756 def __init__(self): 757 util.register_after_fork(self, lambda obj: obj.clear()) 758 def __reduce__(self): 759 return type(self), () 760 761# 762# Definition of BaseProxy 763# 764 765class BaseProxy(object): 766 ''' 767 A base for proxies of shared objects 768 ''' 769 _address_to_local = {} 770 _mutex = util.ForkAwareThreadLock() 771 772 def __init__(self, token, serializer, manager=None, 773 authkey=None, exposed=None, incref=True, manager_owned=False): 774 with BaseProxy._mutex: 775 tls_idset = BaseProxy._address_to_local.get(token.address, None) 776 if tls_idset is None: 777 tls_idset = util.ForkAwareLocal(), ProcessLocalSet() 778 BaseProxy._address_to_local[token.address] = tls_idset 779 780 # self._tls is used to record the connection used by this 781 # thread to communicate with the manager at token.address 782 self._tls = tls_idset[0] 783 784 # self._idset is used to record the identities of all shared 785 # objects for which the current process owns references and 786 # which are in the manager at token.address 787 self._idset = tls_idset[1] 788 789 self._token = token 790 self._id = self._token.id 791 self._manager = manager 792 self._serializer = serializer 793 self._Client = listener_client[serializer][1] 794 795 # Should be set to True only when a proxy object is being created 796 # on the manager server; primary use case: nested proxy objects. 797 # RebuildProxy detects when a proxy is being created on the manager 798 # and sets this value appropriately. 799 self._owned_by_manager = manager_owned 800 801 if authkey is not None: 802 self._authkey = process.AuthenticationString(authkey) 803 elif self._manager is not None: 804 self._authkey = self._manager._authkey 805 else: 806 self._authkey = process.current_process().authkey 807 808 if incref: 809 self._incref() 810 811 util.register_after_fork(self, BaseProxy._after_fork) 812 813 def _connect(self): 814 util.debug('making connection to manager') 815 name = process.current_process().name 816 if threading.current_thread().name != 'MainThread': 817 name += '|' + threading.current_thread().name 818 conn = self._Client(self._token.address, authkey=self._authkey) 819 dispatch(conn, None, 'accept_connection', (name,)) 820 self._tls.connection = conn 821 822 def _callmethod(self, methodname, args=(), kwds={}): 823 ''' 824 Try to call a method of the referrent and return a copy of the result 825 ''' 826 try: 827 conn = self._tls.connection 828 except AttributeError: 829 util.debug('thread %r does not own a connection', 830 threading.current_thread().name) 831 self._connect() 832 conn = self._tls.connection 833 834 conn.send((self._id, methodname, args, kwds)) 835 kind, result = conn.recv() 836 837 if kind == '#RETURN': 838 return result 839 elif kind == '#PROXY': 840 exposed, token = result 841 proxytype = self._manager._registry[token.typeid][-1] 842 token.address = self._token.address 843 proxy = proxytype( 844 token, self._serializer, manager=self._manager, 845 authkey=self._authkey, exposed=exposed 846 ) 847 conn = self._Client(token.address, authkey=self._authkey) 848 dispatch(conn, None, 'decref', (token.id,)) 849 return proxy 850 raise convert_to_error(kind, result) 851 852 def _getvalue(self): 853 ''' 854 Get a copy of the value of the referent 855 ''' 856 return self._callmethod('#GETVALUE') 857 858 def _incref(self): 859 if self._owned_by_manager: 860 util.debug('owned_by_manager skipped INCREF of %r', self._token.id) 861 return 862 863 conn = self._Client(self._token.address, authkey=self._authkey) 864 dispatch(conn, None, 'incref', (self._id,)) 865 util.debug('INCREF %r', self._token.id) 866 867 self._idset.add(self._id) 868 869 state = self._manager and self._manager._state 870 871 self._close = util.Finalize( 872 self, BaseProxy._decref, 873 args=(self._token, self._authkey, state, 874 self._tls, self._idset, self._Client), 875 exitpriority=10 876 ) 877 878 @staticmethod 879 def _decref(token, authkey, state, tls, idset, _Client): 880 idset.discard(token.id) 881 882 # check whether manager is still alive 883 if state is None or state.value == State.STARTED: 884 # tell manager this process no longer cares about referent 885 try: 886 util.debug('DECREF %r', token.id) 887 conn = _Client(token.address, authkey=authkey) 888 dispatch(conn, None, 'decref', (token.id,)) 889 except Exception as e: 890 util.debug('... decref failed %s', e) 891 892 else: 893 util.debug('DECREF %r -- manager already shutdown', token.id) 894 895 # check whether we can close this thread's connection because 896 # the process owns no more references to objects for this manager 897 if not idset and hasattr(tls, 'connection'): 898 util.debug('thread %r has no more proxies so closing conn', 899 threading.current_thread().name) 900 tls.connection.close() 901 del tls.connection 902 903 def _after_fork(self): 904 self._manager = None 905 try: 906 self._incref() 907 except Exception as e: 908 # the proxy may just be for a manager which has shutdown 909 util.info('incref failed: %s' % e) 910 911 def __reduce__(self): 912 kwds = {} 913 if get_spawning_popen() is not None: 914 kwds['authkey'] = self._authkey 915 916 if getattr(self, '_isauto', False): 917 kwds['exposed'] = self._exposed_ 918 return (RebuildProxy, 919 (AutoProxy, self._token, self._serializer, kwds)) 920 else: 921 return (RebuildProxy, 922 (type(self), self._token, self._serializer, kwds)) 923 924 def __deepcopy__(self, memo): 925 return self._getvalue() 926 927 def __repr__(self): 928 return '<%s object, typeid %r at %#x>' % \ 929 (type(self).__name__, self._token.typeid, id(self)) 930 931 def __str__(self): 932 ''' 933 Return representation of the referent (or a fall-back if that fails) 934 ''' 935 try: 936 return self._callmethod('__repr__') 937 except Exception: 938 return repr(self)[:-1] + "; '__str__()' failed>" 939 940# 941# Function used for unpickling 942# 943 944def RebuildProxy(func, token, serializer, kwds): 945 ''' 946 Function used for unpickling proxy objects. 947 ''' 948 server = getattr(process.current_process(), '_manager_server', None) 949 if server and server.address == token.address: 950 util.debug('Rebuild a proxy owned by manager, token=%r', token) 951 kwds['manager_owned'] = True 952 if token.id not in server.id_to_local_proxy_obj: 953 server.id_to_local_proxy_obj[token.id] = \ 954 server.id_to_obj[token.id] 955 incref = ( 956 kwds.pop('incref', True) and 957 not getattr(process.current_process(), '_inheriting', False) 958 ) 959 return func(token, serializer, incref=incref, **kwds) 960 961# 962# Functions to create proxies and proxy types 963# 964 965def MakeProxyType(name, exposed, _cache={}): 966 ''' 967 Return a proxy type whose methods are given by `exposed` 968 ''' 969 exposed = tuple(exposed) 970 try: 971 return _cache[(name, exposed)] 972 except KeyError: 973 pass 974 975 dic = {} 976 977 for meth in exposed: 978 exec('''def %s(self, /, *args, **kwds): 979 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) 980 981 ProxyType = type(name, (BaseProxy,), dic) 982 ProxyType._exposed_ = exposed 983 _cache[(name, exposed)] = ProxyType 984 return ProxyType 985 986 987def AutoProxy(token, serializer, manager=None, authkey=None, 988 exposed=None, incref=True): 989 ''' 990 Return an auto-proxy for `token` 991 ''' 992 _Client = listener_client[serializer][1] 993 994 if exposed is None: 995 conn = _Client(token.address, authkey=authkey) 996 try: 997 exposed = dispatch(conn, None, 'get_methods', (token,)) 998 finally: 999 conn.close() 1000 1001 if authkey is None and manager is not None: 1002 authkey = manager._authkey 1003 if authkey is None: 1004 authkey = process.current_process().authkey 1005 1006 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) 1007 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, 1008 incref=incref) 1009 proxy._isauto = True 1010 return proxy 1011 1012# 1013# Types/callables which we will register with SyncManager 1014# 1015 1016class Namespace(object): 1017 def __init__(self, /, **kwds): 1018 self.__dict__.update(kwds) 1019 def __repr__(self): 1020 items = list(self.__dict__.items()) 1021 temp = [] 1022 for name, value in items: 1023 if not name.startswith('_'): 1024 temp.append('%s=%r' % (name, value)) 1025 temp.sort() 1026 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp)) 1027 1028class Value(object): 1029 def __init__(self, typecode, value, lock=True): 1030 self._typecode = typecode 1031 self._value = value 1032 def get(self): 1033 return self._value 1034 def set(self, value): 1035 self._value = value 1036 def __repr__(self): 1037 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) 1038 value = property(get, set) 1039 1040def Array(typecode, sequence, lock=True): 1041 return array.array(typecode, sequence) 1042 1043# 1044# Proxy types used by SyncManager 1045# 1046 1047class IteratorProxy(BaseProxy): 1048 _exposed_ = ('__next__', 'send', 'throw', 'close') 1049 def __iter__(self): 1050 return self 1051 def __next__(self, *args): 1052 return self._callmethod('__next__', args) 1053 def send(self, *args): 1054 return self._callmethod('send', args) 1055 def throw(self, *args): 1056 return self._callmethod('throw', args) 1057 def close(self, *args): 1058 return self._callmethod('close', args) 1059 1060 1061class AcquirerProxy(BaseProxy): 1062 _exposed_ = ('acquire', 'release') 1063 def acquire(self, blocking=True, timeout=None): 1064 args = (blocking,) if timeout is None else (blocking, timeout) 1065 return self._callmethod('acquire', args) 1066 def release(self): 1067 return self._callmethod('release') 1068 def __enter__(self): 1069 return self._callmethod('acquire') 1070 def __exit__(self, exc_type, exc_val, exc_tb): 1071 return self._callmethod('release') 1072 1073 1074class ConditionProxy(AcquirerProxy): 1075 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') 1076 def wait(self, timeout=None): 1077 return self._callmethod('wait', (timeout,)) 1078 def notify(self, n=1): 1079 return self._callmethod('notify', (n,)) 1080 def notify_all(self): 1081 return self._callmethod('notify_all') 1082 def wait_for(self, predicate, timeout=None): 1083 result = predicate() 1084 if result: 1085 return result 1086 if timeout is not None: 1087 endtime = time.monotonic() + timeout 1088 else: 1089 endtime = None 1090 waittime = None 1091 while not result: 1092 if endtime is not None: 1093 waittime = endtime - time.monotonic() 1094 if waittime <= 0: 1095 break 1096 self.wait(waittime) 1097 result = predicate() 1098 return result 1099 1100 1101class EventProxy(BaseProxy): 1102 _exposed_ = ('is_set', 'set', 'clear', 'wait') 1103 def is_set(self): 1104 return self._callmethod('is_set') 1105 def set(self): 1106 return self._callmethod('set') 1107 def clear(self): 1108 return self._callmethod('clear') 1109 def wait(self, timeout=None): 1110 return self._callmethod('wait', (timeout,)) 1111 1112 1113class BarrierProxy(BaseProxy): 1114 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset') 1115 def wait(self, timeout=None): 1116 return self._callmethod('wait', (timeout,)) 1117 def abort(self): 1118 return self._callmethod('abort') 1119 def reset(self): 1120 return self._callmethod('reset') 1121 @property 1122 def parties(self): 1123 return self._callmethod('__getattribute__', ('parties',)) 1124 @property 1125 def n_waiting(self): 1126 return self._callmethod('__getattribute__', ('n_waiting',)) 1127 @property 1128 def broken(self): 1129 return self._callmethod('__getattribute__', ('broken',)) 1130 1131 1132class NamespaceProxy(BaseProxy): 1133 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') 1134 def __getattr__(self, key): 1135 if key[0] == '_': 1136 return object.__getattribute__(self, key) 1137 callmethod = object.__getattribute__(self, '_callmethod') 1138 return callmethod('__getattribute__', (key,)) 1139 def __setattr__(self, key, value): 1140 if key[0] == '_': 1141 return object.__setattr__(self, key, value) 1142 callmethod = object.__getattribute__(self, '_callmethod') 1143 return callmethod('__setattr__', (key, value)) 1144 def __delattr__(self, key): 1145 if key[0] == '_': 1146 return object.__delattr__(self, key) 1147 callmethod = object.__getattribute__(self, '_callmethod') 1148 return callmethod('__delattr__', (key,)) 1149 1150 1151class ValueProxy(BaseProxy): 1152 _exposed_ = ('get', 'set') 1153 def get(self): 1154 return self._callmethod('get') 1155 def set(self, value): 1156 return self._callmethod('set', (value,)) 1157 value = property(get, set) 1158 1159 1160BaseListProxy = MakeProxyType('BaseListProxy', ( 1161 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', 1162 '__mul__', '__reversed__', '__rmul__', '__setitem__', 1163 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 1164 'reverse', 'sort', '__imul__' 1165 )) 1166class ListProxy(BaseListProxy): 1167 def __iadd__(self, value): 1168 self._callmethod('extend', (value,)) 1169 return self 1170 def __imul__(self, value): 1171 self._callmethod('__imul__', (value,)) 1172 return self 1173 1174 1175DictProxy = MakeProxyType('DictProxy', ( 1176 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__', 1177 '__setitem__', 'clear', 'copy', 'get', 'items', 1178 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' 1179 )) 1180DictProxy._method_to_typeid_ = { 1181 '__iter__': 'Iterator', 1182 } 1183 1184 1185ArrayProxy = MakeProxyType('ArrayProxy', ( 1186 '__len__', '__getitem__', '__setitem__' 1187 )) 1188 1189 1190BasePoolProxy = MakeProxyType('PoolProxy', ( 1191 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 1192 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', 1193 )) 1194BasePoolProxy._method_to_typeid_ = { 1195 'apply_async': 'AsyncResult', 1196 'map_async': 'AsyncResult', 1197 'starmap_async': 'AsyncResult', 1198 'imap': 'Iterator', 1199 'imap_unordered': 'Iterator' 1200 } 1201class PoolProxy(BasePoolProxy): 1202 def __enter__(self): 1203 return self 1204 def __exit__(self, exc_type, exc_val, exc_tb): 1205 self.terminate() 1206 1207# 1208# Definition of SyncManager 1209# 1210 1211class SyncManager(BaseManager): 1212 ''' 1213 Subclass of `BaseManager` which supports a number of shared object types. 1214 1215 The types registered are those intended for the synchronization 1216 of threads, plus `dict`, `list` and `Namespace`. 1217 1218 The `multiprocessing.Manager()` function creates started instances of 1219 this class. 1220 ''' 1221 1222SyncManager.register('Queue', queue.Queue) 1223SyncManager.register('JoinableQueue', queue.Queue) 1224SyncManager.register('Event', threading.Event, EventProxy) 1225SyncManager.register('Lock', threading.Lock, AcquirerProxy) 1226SyncManager.register('RLock', threading.RLock, AcquirerProxy) 1227SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) 1228SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, 1229 AcquirerProxy) 1230SyncManager.register('Condition', threading.Condition, ConditionProxy) 1231SyncManager.register('Barrier', threading.Barrier, BarrierProxy) 1232SyncManager.register('Pool', pool.Pool, PoolProxy) 1233SyncManager.register('list', list, ListProxy) 1234SyncManager.register('dict', dict, DictProxy) 1235SyncManager.register('Value', Value, ValueProxy) 1236SyncManager.register('Array', Array, ArrayProxy) 1237SyncManager.register('Namespace', Namespace, NamespaceProxy) 1238 1239# types returned by methods of PoolProxy 1240SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) 1241SyncManager.register('AsyncResult', create_method=False) 1242 1243# 1244# Definition of SharedMemoryManager and SharedMemoryServer 1245# 1246 1247if HAS_SHMEM: 1248 class _SharedMemoryTracker: 1249 "Manages one or more shared memory segments." 1250 1251 def __init__(self, name, segment_names=[]): 1252 self.shared_memory_context_name = name 1253 self.segment_names = segment_names 1254 1255 def register_segment(self, segment_name): 1256 "Adds the supplied shared memory block name to tracker." 1257 util.debug(f"Register segment {segment_name!r} in pid {getpid()}") 1258 self.segment_names.append(segment_name) 1259 1260 def destroy_segment(self, segment_name): 1261 """Calls unlink() on the shared memory block with the supplied name 1262 and removes it from the list of blocks being tracked.""" 1263 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}") 1264 self.segment_names.remove(segment_name) 1265 segment = shared_memory.SharedMemory(segment_name) 1266 segment.close() 1267 segment.unlink() 1268 1269 def unlink(self): 1270 "Calls destroy_segment() on all tracked shared memory blocks." 1271 for segment_name in self.segment_names[:]: 1272 self.destroy_segment(segment_name) 1273 1274 def __del__(self): 1275 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}") 1276 self.unlink() 1277 1278 def __getstate__(self): 1279 return (self.shared_memory_context_name, self.segment_names) 1280 1281 def __setstate__(self, state): 1282 self.__init__(*state) 1283 1284 1285 class SharedMemoryServer(Server): 1286 1287 public = Server.public + \ 1288 ['track_segment', 'release_segment', 'list_segments'] 1289 1290 def __init__(self, *args, **kwargs): 1291 Server.__init__(self, *args, **kwargs) 1292 self.shared_memory_context = \ 1293 _SharedMemoryTracker(f"shmm_{self.address}_{getpid()}") 1294 util.debug(f"SharedMemoryServer started by pid {getpid()}") 1295 1296 def create(*args, **kwargs): 1297 """Create a new distributed-shared object (not backed by a shared 1298 memory block) and return its id to be used in a Proxy Object.""" 1299 # Unless set up as a shared proxy, don't make shared_memory_context 1300 # a standard part of kwargs. This makes things easier for supplying 1301 # simple functions. 1302 if len(args) >= 3: 1303 typeod = args[2] 1304 elif 'typeid' in kwargs: 1305 typeid = kwargs['typeid'] 1306 elif not args: 1307 raise TypeError("descriptor 'create' of 'SharedMemoryServer' " 1308 "object needs an argument") 1309 else: 1310 raise TypeError('create expected at least 2 positional ' 1311 'arguments, got %d' % (len(args)-1)) 1312 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"): 1313 kwargs['shared_memory_context'] = self.shared_memory_context 1314 return Server.create(*args, **kwargs) 1315 create.__text_signature__ = '($self, c, typeid, /, *args, **kwargs)' 1316 1317 def shutdown(self, c): 1318 "Call unlink() on all tracked shared memory, terminate the Server." 1319 self.shared_memory_context.unlink() 1320 return Server.shutdown(self, c) 1321 1322 def track_segment(self, c, segment_name): 1323 "Adds the supplied shared memory block name to Server's tracker." 1324 self.shared_memory_context.register_segment(segment_name) 1325 1326 def release_segment(self, c, segment_name): 1327 """Calls unlink() on the shared memory block with the supplied name 1328 and removes it from the tracker instance inside the Server.""" 1329 self.shared_memory_context.destroy_segment(segment_name) 1330 1331 def list_segments(self, c): 1332 """Returns a list of names of shared memory blocks that the Server 1333 is currently tracking.""" 1334 return self.shared_memory_context.segment_names 1335 1336 1337 class SharedMemoryManager(BaseManager): 1338 """Like SyncManager but uses SharedMemoryServer instead of Server. 1339 1340 It provides methods for creating and returning SharedMemory instances 1341 and for creating a list-like object (ShareableList) backed by shared 1342 memory. It also provides methods that create and return Proxy Objects 1343 that support synchronization across processes (i.e. multi-process-safe 1344 locks and semaphores). 1345 """ 1346 1347 _Server = SharedMemoryServer 1348 1349 def __init__(self, *args, **kwargs): 1350 if os.name == "posix": 1351 # bpo-36867: Ensure the resource_tracker is running before 1352 # launching the manager process, so that concurrent 1353 # shared_memory manipulation both in the manager and in the 1354 # current process does not create two resource_tracker 1355 # processes. 1356 from . import resource_tracker 1357 resource_tracker.ensure_running() 1358 BaseManager.__init__(self, *args, **kwargs) 1359 util.debug(f"{self.__class__.__name__} created by pid {getpid()}") 1360 1361 def __del__(self): 1362 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}") 1363 pass 1364 1365 def get_server(self): 1366 'Better than monkeypatching for now; merge into Server ultimately' 1367 if self._state.value != State.INITIAL: 1368 if self._state.value == State.STARTED: 1369 raise ProcessError("Already started SharedMemoryServer") 1370 elif self._state.value == State.SHUTDOWN: 1371 raise ProcessError("SharedMemoryManager has shut down") 1372 else: 1373 raise ProcessError( 1374 "Unknown state {!r}".format(self._state.value)) 1375 return self._Server(self._registry, self._address, 1376 self._authkey, self._serializer) 1377 1378 def SharedMemory(self, size): 1379 """Returns a new SharedMemory instance with the specified size in 1380 bytes, to be tracked by the manager.""" 1381 with self._Client(self._address, authkey=self._authkey) as conn: 1382 sms = shared_memory.SharedMemory(None, create=True, size=size) 1383 try: 1384 dispatch(conn, None, 'track_segment', (sms.name,)) 1385 except BaseException as e: 1386 sms.unlink() 1387 raise e 1388 return sms 1389 1390 def ShareableList(self, sequence): 1391 """Returns a new ShareableList instance populated with the values 1392 from the input sequence, to be tracked by the manager.""" 1393 with self._Client(self._address, authkey=self._authkey) as conn: 1394 sl = shared_memory.ShareableList(sequence) 1395 try: 1396 dispatch(conn, None, 'track_segment', (sl.shm.name,)) 1397 except BaseException as e: 1398 sl.shm.unlink() 1399 raise e 1400 return sl 1401