1# 2# Module providing manager classes for dealing 3# with shared objects 4# 5# multiprocessing/managers.py 6# 7# Copyright (c) 2006-2008, R Oudkerk 8# Licensed to PSF under a Contributor Agreement. 9# 10 11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] 12 13# 14# Imports 15# 16 17import sys 18import threading 19import signal 20import array 21import queue 22import time 23import types 24import os 25from os import getpid 26 27from traceback import format_exc 28 29from . import connection 30from .context import reduction, get_spawning_popen, ProcessError 31from . import pool 32from . import process 33from . import util 34from . import get_context 35try: 36 from . import shared_memory 37except ImportError: 38 HAS_SHMEM = False 39else: 40 HAS_SHMEM = True 41 __all__.append('SharedMemoryManager') 42 43# 44# Register some things for pickling 45# 46 47def reduce_array(a): 48 return array.array, (a.typecode, a.tobytes()) 49reduction.register(array.array, reduce_array) 50 51view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] 52def rebuild_as_list(obj): 53 return list, (list(obj),) 54for view_type in view_types: 55 reduction.register(view_type, rebuild_as_list) 56del view_type, view_types 57 58# 59# Type for identifying shared objects 60# 61 62class Token(object): 63 ''' 64 Type to uniquely identify a shared object 65 ''' 66 __slots__ = ('typeid', 'address', 'id') 67 68 def __init__(self, typeid, address, id): 69 (self.typeid, self.address, self.id) = (typeid, address, id) 70 71 def __getstate__(self): 72 return (self.typeid, self.address, self.id) 73 74 def __setstate__(self, state): 75 (self.typeid, self.address, self.id) = state 76 77 def __repr__(self): 78 return '%s(typeid=%r, address=%r, id=%r)' % \ 79 (self.__class__.__name__, self.typeid, self.address, self.id) 80 81# 82# Function for communication with a manager's server process 83# 84 85def dispatch(c, id, methodname, args=(), kwds={}): 86 ''' 87 Send a message to manager using connection `c` and return response 88 ''' 89 c.send((id, methodname, args, kwds)) 90 kind, result = c.recv() 91 if kind == '#RETURN': 92 return result 93 try: 94 raise convert_to_error(kind, result) 95 finally: 96 del result # break reference cycle 97 98def convert_to_error(kind, result): 99 if kind == '#ERROR': 100 return result 101 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'): 102 if not isinstance(result, str): 103 raise TypeError( 104 "Result {0!r} (kind '{1}') type is {2}, not str".format( 105 result, kind, type(result))) 106 if kind == '#UNSERIALIZABLE': 107 return RemoteError('Unserializable message: %s\n' % result) 108 else: 109 return RemoteError(result) 110 else: 111 return ValueError('Unrecognized message type {!r}'.format(kind)) 112 113class RemoteError(Exception): 114 def __str__(self): 115 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) 116 117# 118# Functions for finding the method names of an object 119# 120 121def all_methods(obj): 122 ''' 123 Return a list of names of methods of `obj` 124 ''' 125 temp = [] 126 for name in dir(obj): 127 func = getattr(obj, name) 128 if callable(func): 129 temp.append(name) 130 return temp 131 132def public_methods(obj): 133 ''' 134 Return a list of names of methods of `obj` which do not start with '_' 135 ''' 136 return [name for name in all_methods(obj) if name[0] != '_'] 137 138# 139# Server which is run in a process controlled by a manager 140# 141 142class Server(object): 143 ''' 144 Server class which runs in a process controlled by a manager object 145 ''' 146 public = ['shutdown', 'create', 'accept_connection', 'get_methods', 147 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] 148 149 def __init__(self, registry, address, authkey, serializer): 150 if not isinstance(authkey, bytes): 151 raise TypeError( 152 "Authkey {0!r} is type {1!s}, not bytes".format( 153 authkey, type(authkey))) 154 self.registry = registry 155 self.authkey = process.AuthenticationString(authkey) 156 Listener, Client = listener_client[serializer] 157 158 # do authentication later 159 self.listener = Listener(address=address, backlog=128) 160 self.address = self.listener.address 161 162 self.id_to_obj = {'0': (None, ())} 163 self.id_to_refcount = {} 164 self.id_to_local_proxy_obj = {} 165 self.mutex = threading.Lock() 166 167 def serve_forever(self): 168 ''' 169 Run the server forever 170 ''' 171 self.stop_event = threading.Event() 172 process.current_process()._manager_server = self 173 try: 174 accepter = threading.Thread(target=self.accepter) 175 accepter.daemon = True 176 accepter.start() 177 try: 178 while not self.stop_event.is_set(): 179 self.stop_event.wait(1) 180 except (KeyboardInterrupt, SystemExit): 181 pass 182 finally: 183 if sys.stdout != sys.__stdout__: # what about stderr? 184 util.debug('resetting stdout, stderr') 185 sys.stdout = sys.__stdout__ 186 sys.stderr = sys.__stderr__ 187 sys.exit(0) 188 189 def accepter(self): 190 while True: 191 try: 192 c = self.listener.accept() 193 except OSError: 194 continue 195 t = threading.Thread(target=self.handle_request, args=(c,)) 196 t.daemon = True 197 t.start() 198 199 def _handle_request(self, c): 200 request = None 201 try: 202 connection.deliver_challenge(c, self.authkey) 203 connection.answer_challenge(c, self.authkey) 204 request = c.recv() 205 ignore, funcname, args, kwds = request 206 assert funcname in self.public, '%r unrecognized' % funcname 207 func = getattr(self, funcname) 208 except Exception: 209 msg = ('#TRACEBACK', format_exc()) 210 else: 211 try: 212 result = func(c, *args, **kwds) 213 except Exception: 214 msg = ('#TRACEBACK', format_exc()) 215 else: 216 msg = ('#RETURN', result) 217 218 try: 219 c.send(msg) 220 except Exception as e: 221 try: 222 c.send(('#TRACEBACK', format_exc())) 223 except Exception: 224 pass 225 util.info('Failure to send message: %r', msg) 226 util.info(' ... request was %r', request) 227 util.info(' ... exception was %r', e) 228 229 def handle_request(self, conn): 230 ''' 231 Handle a new connection 232 ''' 233 try: 234 self._handle_request(conn) 235 except SystemExit: 236 # Server.serve_client() calls sys.exit(0) on EOF 237 pass 238 finally: 239 conn.close() 240 241 def serve_client(self, conn): 242 ''' 243 Handle requests from the proxies in a particular process/thread 244 ''' 245 util.debug('starting server thread to service %r', 246 threading.current_thread().name) 247 248 recv = conn.recv 249 send = conn.send 250 id_to_obj = self.id_to_obj 251 252 while not self.stop_event.is_set(): 253 254 try: 255 methodname = obj = None 256 request = recv() 257 ident, methodname, args, kwds = request 258 try: 259 obj, exposed, gettypeid = id_to_obj[ident] 260 except KeyError as ke: 261 try: 262 obj, exposed, gettypeid = \ 263 self.id_to_local_proxy_obj[ident] 264 except KeyError: 265 raise ke 266 267 if methodname not in exposed: 268 raise AttributeError( 269 'method %r of %r object is not in exposed=%r' % 270 (methodname, type(obj), exposed) 271 ) 272 273 function = getattr(obj, methodname) 274 275 try: 276 res = function(*args, **kwds) 277 except Exception as e: 278 msg = ('#ERROR', e) 279 else: 280 typeid = gettypeid and gettypeid.get(methodname, None) 281 if typeid: 282 rident, rexposed = self.create(conn, typeid, res) 283 token = Token(typeid, self.address, rident) 284 msg = ('#PROXY', (rexposed, token)) 285 else: 286 msg = ('#RETURN', res) 287 288 except AttributeError: 289 if methodname is None: 290 msg = ('#TRACEBACK', format_exc()) 291 else: 292 try: 293 fallback_func = self.fallback_mapping[methodname] 294 result = fallback_func( 295 self, conn, ident, obj, *args, **kwds 296 ) 297 msg = ('#RETURN', result) 298 except Exception: 299 msg = ('#TRACEBACK', format_exc()) 300 301 except EOFError: 302 util.debug('got EOF -- exiting thread serving %r', 303 threading.current_thread().name) 304 sys.exit(0) 305 306 except Exception: 307 msg = ('#TRACEBACK', format_exc()) 308 309 try: 310 try: 311 send(msg) 312 except Exception: 313 send(('#UNSERIALIZABLE', format_exc())) 314 except Exception as e: 315 util.info('exception in thread serving %r', 316 threading.current_thread().name) 317 util.info(' ... message was %r', msg) 318 util.info(' ... exception was %r', e) 319 conn.close() 320 sys.exit(1) 321 322 def fallback_getvalue(self, conn, ident, obj): 323 return obj 324 325 def fallback_str(self, conn, ident, obj): 326 return str(obj) 327 328 def fallback_repr(self, conn, ident, obj): 329 return repr(obj) 330 331 fallback_mapping = { 332 '__str__':fallback_str, 333 '__repr__':fallback_repr, 334 '#GETVALUE':fallback_getvalue 335 } 336 337 def dummy(self, c): 338 pass 339 340 def debug_info(self, c): 341 ''' 342 Return some info --- useful to spot problems with refcounting 343 ''' 344 # Perhaps include debug info about 'c'? 345 with self.mutex: 346 result = [] 347 keys = list(self.id_to_refcount.keys()) 348 keys.sort() 349 for ident in keys: 350 if ident != '0': 351 result.append(' %s: refcount=%s\n %s' % 352 (ident, self.id_to_refcount[ident], 353 str(self.id_to_obj[ident][0])[:75])) 354 return '\n'.join(result) 355 356 def number_of_objects(self, c): 357 ''' 358 Number of shared objects 359 ''' 360 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0' 361 return len(self.id_to_refcount) 362 363 def shutdown(self, c): 364 ''' 365 Shutdown this process 366 ''' 367 try: 368 util.debug('manager received shutdown message') 369 c.send(('#RETURN', None)) 370 except: 371 import traceback 372 traceback.print_exc() 373 finally: 374 self.stop_event.set() 375 376 def create(self, c, typeid, /, *args, **kwds): 377 ''' 378 Create a new shared object and return its id 379 ''' 380 with self.mutex: 381 callable, exposed, method_to_typeid, proxytype = \ 382 self.registry[typeid] 383 384 if callable is None: 385 if kwds or (len(args) != 1): 386 raise ValueError( 387 "Without callable, must have one non-keyword argument") 388 obj = args[0] 389 else: 390 obj = callable(*args, **kwds) 391 392 if exposed is None: 393 exposed = public_methods(obj) 394 if method_to_typeid is not None: 395 if not isinstance(method_to_typeid, dict): 396 raise TypeError( 397 "Method_to_typeid {0!r}: type {1!s}, not dict".format( 398 method_to_typeid, type(method_to_typeid))) 399 exposed = list(exposed) + list(method_to_typeid) 400 401 ident = '%x' % id(obj) # convert to string because xmlrpclib 402 # only has 32 bit signed integers 403 util.debug('%r callable returned object with id %r', typeid, ident) 404 405 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) 406 if ident not in self.id_to_refcount: 407 self.id_to_refcount[ident] = 0 408 409 self.incref(c, ident) 410 return ident, tuple(exposed) 411 412 def get_methods(self, c, token): 413 ''' 414 Return the methods of the shared object indicated by token 415 ''' 416 return tuple(self.id_to_obj[token.id][1]) 417 418 def accept_connection(self, c, name): 419 ''' 420 Spawn a new thread to serve this connection 421 ''' 422 threading.current_thread().name = name 423 c.send(('#RETURN', None)) 424 self.serve_client(c) 425 426 def incref(self, c, ident): 427 with self.mutex: 428 try: 429 self.id_to_refcount[ident] += 1 430 except KeyError as ke: 431 # If no external references exist but an internal (to the 432 # manager) still does and a new external reference is created 433 # from it, restore the manager's tracking of it from the 434 # previously stashed internal ref. 435 if ident in self.id_to_local_proxy_obj: 436 self.id_to_refcount[ident] = 1 437 self.id_to_obj[ident] = \ 438 self.id_to_local_proxy_obj[ident] 439 util.debug('Server re-enabled tracking & INCREF %r', ident) 440 else: 441 raise ke 442 443 def decref(self, c, ident): 444 if ident not in self.id_to_refcount and \ 445 ident in self.id_to_local_proxy_obj: 446 util.debug('Server DECREF skipping %r', ident) 447 return 448 449 with self.mutex: 450 if self.id_to_refcount[ident] <= 0: 451 raise AssertionError( 452 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format( 453 ident, self.id_to_obj[ident], 454 self.id_to_refcount[ident])) 455 self.id_to_refcount[ident] -= 1 456 if self.id_to_refcount[ident] == 0: 457 del self.id_to_refcount[ident] 458 459 if ident not in self.id_to_refcount: 460 # Two-step process in case the object turns out to contain other 461 # proxy objects (e.g. a managed list of managed lists). 462 # Otherwise, deleting self.id_to_obj[ident] would trigger the 463 # deleting of the stored value (another managed object) which would 464 # in turn attempt to acquire the mutex that is already held here. 465 self.id_to_obj[ident] = (None, (), None) # thread-safe 466 util.debug('disposing of obj with id %r', ident) 467 with self.mutex: 468 del self.id_to_obj[ident] 469 470 471# 472# Class to represent state of a manager 473# 474 475class State(object): 476 __slots__ = ['value'] 477 INITIAL = 0 478 STARTED = 1 479 SHUTDOWN = 2 480 481# 482# Mapping from serializer name to Listener and Client types 483# 484 485listener_client = { 486 'pickle' : (connection.Listener, connection.Client), 487 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) 488 } 489 490# 491# Definition of BaseManager 492# 493 494class BaseManager(object): 495 ''' 496 Base class for managers 497 ''' 498 _registry = {} 499 _Server = Server 500 501 def __init__(self, address=None, authkey=None, serializer='pickle', 502 ctx=None, *, shutdown_timeout=1.0): 503 if authkey is None: 504 authkey = process.current_process().authkey 505 self._address = address # XXX not final address if eg ('', 0) 506 self._authkey = process.AuthenticationString(authkey) 507 self._state = State() 508 self._state.value = State.INITIAL 509 self._serializer = serializer 510 self._Listener, self._Client = listener_client[serializer] 511 self._ctx = ctx or get_context() 512 self._shutdown_timeout = shutdown_timeout 513 514 def get_server(self): 515 ''' 516 Return server object with serve_forever() method and address attribute 517 ''' 518 if self._state.value != State.INITIAL: 519 if self._state.value == State.STARTED: 520 raise ProcessError("Already started server") 521 elif self._state.value == State.SHUTDOWN: 522 raise ProcessError("Manager has shut down") 523 else: 524 raise ProcessError( 525 "Unknown state {!r}".format(self._state.value)) 526 return Server(self._registry, self._address, 527 self._authkey, self._serializer) 528 529 def connect(self): 530 ''' 531 Connect manager object to the server process 532 ''' 533 Listener, Client = listener_client[self._serializer] 534 conn = Client(self._address, authkey=self._authkey) 535 dispatch(conn, None, 'dummy') 536 self._state.value = State.STARTED 537 538 def start(self, initializer=None, initargs=()): 539 ''' 540 Spawn a server process for this manager object 541 ''' 542 if self._state.value != State.INITIAL: 543 if self._state.value == State.STARTED: 544 raise ProcessError("Already started server") 545 elif self._state.value == State.SHUTDOWN: 546 raise ProcessError("Manager has shut down") 547 else: 548 raise ProcessError( 549 "Unknown state {!r}".format(self._state.value)) 550 551 if initializer is not None and not callable(initializer): 552 raise TypeError('initializer must be a callable') 553 554 # pipe over which we will retrieve address of server 555 reader, writer = connection.Pipe(duplex=False) 556 557 # spawn process which runs a server 558 self._process = self._ctx.Process( 559 target=type(self)._run_server, 560 args=(self._registry, self._address, self._authkey, 561 self._serializer, writer, initializer, initargs), 562 ) 563 ident = ':'.join(str(i) for i in self._process._identity) 564 self._process.name = type(self).__name__ + '-' + ident 565 self._process.start() 566 567 # get address of server 568 writer.close() 569 self._address = reader.recv() 570 reader.close() 571 572 # register a finalizer 573 self._state.value = State.STARTED 574 self.shutdown = util.Finalize( 575 self, type(self)._finalize_manager, 576 args=(self._process, self._address, self._authkey, self._state, 577 self._Client, self._shutdown_timeout), 578 exitpriority=0 579 ) 580 581 @classmethod 582 def _run_server(cls, registry, address, authkey, serializer, writer, 583 initializer=None, initargs=()): 584 ''' 585 Create a server, report its address and run it 586 ''' 587 # bpo-36368: protect server process from KeyboardInterrupt signals 588 signal.signal(signal.SIGINT, signal.SIG_IGN) 589 590 if initializer is not None: 591 initializer(*initargs) 592 593 # create server 594 server = cls._Server(registry, address, authkey, serializer) 595 596 # inform parent process of the server's address 597 writer.send(server.address) 598 writer.close() 599 600 # run the manager 601 util.info('manager serving at %r', server.address) 602 server.serve_forever() 603 604 def _create(self, typeid, /, *args, **kwds): 605 ''' 606 Create a new shared object; return the token and exposed tuple 607 ''' 608 assert self._state.value == State.STARTED, 'server not yet started' 609 conn = self._Client(self._address, authkey=self._authkey) 610 try: 611 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) 612 finally: 613 conn.close() 614 return Token(typeid, self._address, id), exposed 615 616 def join(self, timeout=None): 617 ''' 618 Join the manager process (if it has been spawned) 619 ''' 620 if self._process is not None: 621 self._process.join(timeout) 622 if not self._process.is_alive(): 623 self._process = None 624 625 def _debug_info(self): 626 ''' 627 Return some info about the servers shared objects and connections 628 ''' 629 conn = self._Client(self._address, authkey=self._authkey) 630 try: 631 return dispatch(conn, None, 'debug_info') 632 finally: 633 conn.close() 634 635 def _number_of_objects(self): 636 ''' 637 Return the number of shared objects 638 ''' 639 conn = self._Client(self._address, authkey=self._authkey) 640 try: 641 return dispatch(conn, None, 'number_of_objects') 642 finally: 643 conn.close() 644 645 def __enter__(self): 646 if self._state.value == State.INITIAL: 647 self.start() 648 if self._state.value != State.STARTED: 649 if self._state.value == State.INITIAL: 650 raise ProcessError("Unable to start server") 651 elif self._state.value == State.SHUTDOWN: 652 raise ProcessError("Manager has shut down") 653 else: 654 raise ProcessError( 655 "Unknown state {!r}".format(self._state.value)) 656 return self 657 658 def __exit__(self, exc_type, exc_val, exc_tb): 659 self.shutdown() 660 661 @staticmethod 662 def _finalize_manager(process, address, authkey, state, _Client, 663 shutdown_timeout): 664 ''' 665 Shutdown the manager process; will be registered as a finalizer 666 ''' 667 if process.is_alive(): 668 util.info('sending shutdown message to manager') 669 try: 670 conn = _Client(address, authkey=authkey) 671 try: 672 dispatch(conn, None, 'shutdown') 673 finally: 674 conn.close() 675 except Exception: 676 pass 677 678 process.join(timeout=shutdown_timeout) 679 if process.is_alive(): 680 util.info('manager still alive') 681 if hasattr(process, 'terminate'): 682 util.info('trying to `terminate()` manager process') 683 process.terminate() 684 process.join(timeout=shutdown_timeout) 685 if process.is_alive(): 686 util.info('manager still alive after terminate') 687 process.kill() 688 process.join() 689 690 state.value = State.SHUTDOWN 691 try: 692 del BaseProxy._address_to_local[address] 693 except KeyError: 694 pass 695 696 @property 697 def address(self): 698 return self._address 699 700 @classmethod 701 def register(cls, typeid, callable=None, proxytype=None, exposed=None, 702 method_to_typeid=None, create_method=True): 703 ''' 704 Register a typeid with the manager type 705 ''' 706 if '_registry' not in cls.__dict__: 707 cls._registry = cls._registry.copy() 708 709 if proxytype is None: 710 proxytype = AutoProxy 711 712 exposed = exposed or getattr(proxytype, '_exposed_', None) 713 714 method_to_typeid = method_to_typeid or \ 715 getattr(proxytype, '_method_to_typeid_', None) 716 717 if method_to_typeid: 718 for key, value in list(method_to_typeid.items()): # isinstance? 719 assert type(key) is str, '%r is not a string' % key 720 assert type(value) is str, '%r is not a string' % value 721 722 cls._registry[typeid] = ( 723 callable, exposed, method_to_typeid, proxytype 724 ) 725 726 if create_method: 727 def temp(self, /, *args, **kwds): 728 util.debug('requesting creation of a shared %r object', typeid) 729 token, exp = self._create(typeid, *args, **kwds) 730 proxy = proxytype( 731 token, self._serializer, manager=self, 732 authkey=self._authkey, exposed=exp 733 ) 734 conn = self._Client(token.address, authkey=self._authkey) 735 dispatch(conn, None, 'decref', (token.id,)) 736 return proxy 737 temp.__name__ = typeid 738 setattr(cls, typeid, temp) 739 740# 741# Subclass of set which get cleared after a fork 742# 743 744class ProcessLocalSet(set): 745 def __init__(self): 746 util.register_after_fork(self, lambda obj: obj.clear()) 747 def __reduce__(self): 748 return type(self), () 749 750# 751# Definition of BaseProxy 752# 753 754class BaseProxy(object): 755 ''' 756 A base for proxies of shared objects 757 ''' 758 _address_to_local = {} 759 _mutex = util.ForkAwareThreadLock() 760 761 # Each instance gets a `_serial` number. Unlike `id(...)`, this number 762 # is never reused. 763 _next_serial = 1 764 765 def __init__(self, token, serializer, manager=None, 766 authkey=None, exposed=None, incref=True, manager_owned=False): 767 with BaseProxy._mutex: 768 tls_serials = BaseProxy._address_to_local.get(token.address, None) 769 if tls_serials is None: 770 tls_serials = util.ForkAwareLocal(), ProcessLocalSet() 771 BaseProxy._address_to_local[token.address] = tls_serials 772 773 self._serial = BaseProxy._next_serial 774 BaseProxy._next_serial += 1 775 776 # self._tls is used to record the connection used by this 777 # thread to communicate with the manager at token.address 778 self._tls = tls_serials[0] 779 780 # self._all_serials is a set used to record the identities of all 781 # shared objects for which the current process owns references and 782 # which are in the manager at token.address 783 self._all_serials = tls_serials[1] 784 785 self._token = token 786 self._id = self._token.id 787 self._manager = manager 788 self._serializer = serializer 789 self._Client = listener_client[serializer][1] 790 791 # Should be set to True only when a proxy object is being created 792 # on the manager server; primary use case: nested proxy objects. 793 # RebuildProxy detects when a proxy is being created on the manager 794 # and sets this value appropriately. 795 self._owned_by_manager = manager_owned 796 797 if authkey is not None: 798 self._authkey = process.AuthenticationString(authkey) 799 elif self._manager is not None: 800 self._authkey = self._manager._authkey 801 else: 802 self._authkey = process.current_process().authkey 803 804 if incref: 805 self._incref() 806 807 util.register_after_fork(self, BaseProxy._after_fork) 808 809 def _connect(self): 810 util.debug('making connection to manager') 811 name = process.current_process().name 812 if threading.current_thread().name != 'MainThread': 813 name += '|' + threading.current_thread().name 814 conn = self._Client(self._token.address, authkey=self._authkey) 815 dispatch(conn, None, 'accept_connection', (name,)) 816 self._tls.connection = conn 817 818 def _callmethod(self, methodname, args=(), kwds={}): 819 ''' 820 Try to call a method of the referent and return a copy of the result 821 ''' 822 try: 823 conn = self._tls.connection 824 except AttributeError: 825 util.debug('thread %r does not own a connection', 826 threading.current_thread().name) 827 self._connect() 828 conn = self._tls.connection 829 830 conn.send((self._id, methodname, args, kwds)) 831 kind, result = conn.recv() 832 833 if kind == '#RETURN': 834 return result 835 elif kind == '#PROXY': 836 exposed, token = result 837 proxytype = self._manager._registry[token.typeid][-1] 838 token.address = self._token.address 839 proxy = proxytype( 840 token, self._serializer, manager=self._manager, 841 authkey=self._authkey, exposed=exposed 842 ) 843 conn = self._Client(token.address, authkey=self._authkey) 844 dispatch(conn, None, 'decref', (token.id,)) 845 return proxy 846 try: 847 raise convert_to_error(kind, result) 848 finally: 849 del result # break reference cycle 850 851 def _getvalue(self): 852 ''' 853 Get a copy of the value of the referent 854 ''' 855 return self._callmethod('#GETVALUE') 856 857 def _incref(self): 858 if self._owned_by_manager: 859 util.debug('owned_by_manager skipped INCREF of %r', self._token.id) 860 return 861 862 conn = self._Client(self._token.address, authkey=self._authkey) 863 dispatch(conn, None, 'incref', (self._id,)) 864 util.debug('INCREF %r', self._token.id) 865 866 self._all_serials.add(self._serial) 867 868 state = self._manager and self._manager._state 869 870 self._close = util.Finalize( 871 self, BaseProxy._decref, 872 args=(self._token, self._serial, self._authkey, state, 873 self._tls, self._all_serials, self._Client), 874 exitpriority=10 875 ) 876 877 @staticmethod 878 def _decref(token, serial, authkey, state, tls, idset, _Client): 879 idset.discard(serial) 880 881 # check whether manager is still alive 882 if state is None or state.value == State.STARTED: 883 # tell manager this process no longer cares about referent 884 try: 885 util.debug('DECREF %r', token.id) 886 conn = _Client(token.address, authkey=authkey) 887 dispatch(conn, None, 'decref', (token.id,)) 888 except Exception as e: 889 util.debug('... decref failed %s', e) 890 891 else: 892 util.debug('DECREF %r -- manager already shutdown', token.id) 893 894 # check whether we can close this thread's connection because 895 # the process owns no more references to objects for this manager 896 if not idset and hasattr(tls, 'connection'): 897 util.debug('thread %r has no more proxies so closing conn', 898 threading.current_thread().name) 899 tls.connection.close() 900 del tls.connection 901 902 def _after_fork(self): 903 self._manager = None 904 try: 905 self._incref() 906 except Exception as e: 907 # the proxy may just be for a manager which has shutdown 908 util.info('incref failed: %s' % e) 909 910 def __reduce__(self): 911 kwds = {} 912 if get_spawning_popen() is not None: 913 kwds['authkey'] = self._authkey 914 915 if getattr(self, '_isauto', False): 916 kwds['exposed'] = self._exposed_ 917 return (RebuildProxy, 918 (AutoProxy, self._token, self._serializer, kwds)) 919 else: 920 return (RebuildProxy, 921 (type(self), self._token, self._serializer, kwds)) 922 923 def __deepcopy__(self, memo): 924 return self._getvalue() 925 926 def __repr__(self): 927 return '<%s object, typeid %r at %#x>' % \ 928 (type(self).__name__, self._token.typeid, id(self)) 929 930 def __str__(self): 931 ''' 932 Return representation of the referent (or a fall-back if that fails) 933 ''' 934 try: 935 return self._callmethod('__repr__') 936 except Exception: 937 return repr(self)[:-1] + "; '__str__()' failed>" 938 939# 940# Function used for unpickling 941# 942 943def RebuildProxy(func, token, serializer, kwds): 944 ''' 945 Function used for unpickling proxy objects. 946 ''' 947 server = getattr(process.current_process(), '_manager_server', None) 948 if server and server.address == token.address: 949 util.debug('Rebuild a proxy owned by manager, token=%r', token) 950 kwds['manager_owned'] = True 951 if token.id not in server.id_to_local_proxy_obj: 952 server.id_to_local_proxy_obj[token.id] = \ 953 server.id_to_obj[token.id] 954 incref = ( 955 kwds.pop('incref', True) and 956 not getattr(process.current_process(), '_inheriting', False) 957 ) 958 return func(token, serializer, incref=incref, **kwds) 959 960# 961# Functions to create proxies and proxy types 962# 963 964def MakeProxyType(name, exposed, _cache={}): 965 ''' 966 Return a proxy type whose methods are given by `exposed` 967 ''' 968 exposed = tuple(exposed) 969 try: 970 return _cache[(name, exposed)] 971 except KeyError: 972 pass 973 974 dic = {} 975 976 for meth in exposed: 977 exec('''def %s(self, /, *args, **kwds): 978 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) 979 980 ProxyType = type(name, (BaseProxy,), dic) 981 ProxyType._exposed_ = exposed 982 _cache[(name, exposed)] = ProxyType 983 return ProxyType 984 985 986def AutoProxy(token, serializer, manager=None, authkey=None, 987 exposed=None, incref=True, manager_owned=False): 988 ''' 989 Return an auto-proxy for `token` 990 ''' 991 _Client = listener_client[serializer][1] 992 993 if exposed is None: 994 conn = _Client(token.address, authkey=authkey) 995 try: 996 exposed = dispatch(conn, None, 'get_methods', (token,)) 997 finally: 998 conn.close() 999 1000 if authkey is None and manager is not None: 1001 authkey = manager._authkey 1002 if authkey is None: 1003 authkey = process.current_process().authkey 1004 1005 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) 1006 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, 1007 incref=incref, manager_owned=manager_owned) 1008 proxy._isauto = True 1009 return proxy 1010 1011# 1012# Types/callables which we will register with SyncManager 1013# 1014 1015class Namespace(object): 1016 def __init__(self, /, **kwds): 1017 self.__dict__.update(kwds) 1018 def __repr__(self): 1019 items = list(self.__dict__.items()) 1020 temp = [] 1021 for name, value in items: 1022 if not name.startswith('_'): 1023 temp.append('%s=%r' % (name, value)) 1024 temp.sort() 1025 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp)) 1026 1027class Value(object): 1028 def __init__(self, typecode, value, lock=True): 1029 self._typecode = typecode 1030 self._value = value 1031 def get(self): 1032 return self._value 1033 def set(self, value): 1034 self._value = value 1035 def __repr__(self): 1036 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) 1037 value = property(get, set) 1038 1039def Array(typecode, sequence, lock=True): 1040 return array.array(typecode, sequence) 1041 1042# 1043# Proxy types used by SyncManager 1044# 1045 1046class IteratorProxy(BaseProxy): 1047 _exposed_ = ('__next__', 'send', 'throw', 'close') 1048 def __iter__(self): 1049 return self 1050 def __next__(self, *args): 1051 return self._callmethod('__next__', args) 1052 def send(self, *args): 1053 return self._callmethod('send', args) 1054 def throw(self, *args): 1055 return self._callmethod('throw', args) 1056 def close(self, *args): 1057 return self._callmethod('close', args) 1058 1059 1060class AcquirerProxy(BaseProxy): 1061 _exposed_ = ('acquire', 'release') 1062 def acquire(self, blocking=True, timeout=None): 1063 args = (blocking,) if timeout is None else (blocking, timeout) 1064 return self._callmethod('acquire', args) 1065 def release(self): 1066 return self._callmethod('release') 1067 def __enter__(self): 1068 return self._callmethod('acquire') 1069 def __exit__(self, exc_type, exc_val, exc_tb): 1070 return self._callmethod('release') 1071 1072 1073class ConditionProxy(AcquirerProxy): 1074 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') 1075 def wait(self, timeout=None): 1076 return self._callmethod('wait', (timeout,)) 1077 def notify(self, n=1): 1078 return self._callmethod('notify', (n,)) 1079 def notify_all(self): 1080 return self._callmethod('notify_all') 1081 def wait_for(self, predicate, timeout=None): 1082 result = predicate() 1083 if result: 1084 return result 1085 if timeout is not None: 1086 endtime = time.monotonic() + timeout 1087 else: 1088 endtime = None 1089 waittime = None 1090 while not result: 1091 if endtime is not None: 1092 waittime = endtime - time.monotonic() 1093 if waittime <= 0: 1094 break 1095 self.wait(waittime) 1096 result = predicate() 1097 return result 1098 1099 1100class EventProxy(BaseProxy): 1101 _exposed_ = ('is_set', 'set', 'clear', 'wait') 1102 def is_set(self): 1103 return self._callmethod('is_set') 1104 def set(self): 1105 return self._callmethod('set') 1106 def clear(self): 1107 return self._callmethod('clear') 1108 def wait(self, timeout=None): 1109 return self._callmethod('wait', (timeout,)) 1110 1111 1112class BarrierProxy(BaseProxy): 1113 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset') 1114 def wait(self, timeout=None): 1115 return self._callmethod('wait', (timeout,)) 1116 def abort(self): 1117 return self._callmethod('abort') 1118 def reset(self): 1119 return self._callmethod('reset') 1120 @property 1121 def parties(self): 1122 return self._callmethod('__getattribute__', ('parties',)) 1123 @property 1124 def n_waiting(self): 1125 return self._callmethod('__getattribute__', ('n_waiting',)) 1126 @property 1127 def broken(self): 1128 return self._callmethod('__getattribute__', ('broken',)) 1129 1130 1131class NamespaceProxy(BaseProxy): 1132 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') 1133 def __getattr__(self, key): 1134 if key[0] == '_': 1135 return object.__getattribute__(self, key) 1136 callmethod = object.__getattribute__(self, '_callmethod') 1137 return callmethod('__getattribute__', (key,)) 1138 def __setattr__(self, key, value): 1139 if key[0] == '_': 1140 return object.__setattr__(self, key, value) 1141 callmethod = object.__getattribute__(self, '_callmethod') 1142 return callmethod('__setattr__', (key, value)) 1143 def __delattr__(self, key): 1144 if key[0] == '_': 1145 return object.__delattr__(self, key) 1146 callmethod = object.__getattribute__(self, '_callmethod') 1147 return callmethod('__delattr__', (key,)) 1148 1149 1150class ValueProxy(BaseProxy): 1151 _exposed_ = ('get', 'set') 1152 def get(self): 1153 return self._callmethod('get') 1154 def set(self, value): 1155 return self._callmethod('set', (value,)) 1156 value = property(get, set) 1157 1158 __class_getitem__ = classmethod(types.GenericAlias) 1159 1160 1161BaseListProxy = MakeProxyType('BaseListProxy', ( 1162 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', 1163 '__mul__', '__reversed__', '__rmul__', '__setitem__', 1164 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 1165 'reverse', 'sort', '__imul__' 1166 )) 1167class ListProxy(BaseListProxy): 1168 def __iadd__(self, value): 1169 self._callmethod('extend', (value,)) 1170 return self 1171 def __imul__(self, value): 1172 self._callmethod('__imul__', (value,)) 1173 return self 1174 1175 __class_getitem__ = classmethod(types.GenericAlias) 1176 1177 1178_BaseDictProxy = MakeProxyType('DictProxy', ( 1179 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__', 1180 '__setitem__', 'clear', 'copy', 'get', 'items', 1181 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' 1182 )) 1183_BaseDictProxy._method_to_typeid_ = { 1184 '__iter__': 'Iterator', 1185 } 1186class DictProxy(_BaseDictProxy): 1187 __class_getitem__ = classmethod(types.GenericAlias) 1188 1189 1190ArrayProxy = MakeProxyType('ArrayProxy', ( 1191 '__len__', '__getitem__', '__setitem__' 1192 )) 1193 1194 1195BasePoolProxy = MakeProxyType('PoolProxy', ( 1196 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 1197 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', 1198 )) 1199BasePoolProxy._method_to_typeid_ = { 1200 'apply_async': 'AsyncResult', 1201 'map_async': 'AsyncResult', 1202 'starmap_async': 'AsyncResult', 1203 'imap': 'Iterator', 1204 'imap_unordered': 'Iterator' 1205 } 1206class PoolProxy(BasePoolProxy): 1207 def __enter__(self): 1208 return self 1209 def __exit__(self, exc_type, exc_val, exc_tb): 1210 self.terminate() 1211 1212# 1213# Definition of SyncManager 1214# 1215 1216class SyncManager(BaseManager): 1217 ''' 1218 Subclass of `BaseManager` which supports a number of shared object types. 1219 1220 The types registered are those intended for the synchronization 1221 of threads, plus `dict`, `list` and `Namespace`. 1222 1223 The `multiprocessing.Manager()` function creates started instances of 1224 this class. 1225 ''' 1226 1227SyncManager.register('Queue', queue.Queue) 1228SyncManager.register('JoinableQueue', queue.Queue) 1229SyncManager.register('Event', threading.Event, EventProxy) 1230SyncManager.register('Lock', threading.Lock, AcquirerProxy) 1231SyncManager.register('RLock', threading.RLock, AcquirerProxy) 1232SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) 1233SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, 1234 AcquirerProxy) 1235SyncManager.register('Condition', threading.Condition, ConditionProxy) 1236SyncManager.register('Barrier', threading.Barrier, BarrierProxy) 1237SyncManager.register('Pool', pool.Pool, PoolProxy) 1238SyncManager.register('list', list, ListProxy) 1239SyncManager.register('dict', dict, DictProxy) 1240SyncManager.register('Value', Value, ValueProxy) 1241SyncManager.register('Array', Array, ArrayProxy) 1242SyncManager.register('Namespace', Namespace, NamespaceProxy) 1243 1244# types returned by methods of PoolProxy 1245SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) 1246SyncManager.register('AsyncResult', create_method=False) 1247 1248# 1249# Definition of SharedMemoryManager and SharedMemoryServer 1250# 1251 1252if HAS_SHMEM: 1253 class _SharedMemoryTracker: 1254 "Manages one or more shared memory segments." 1255 1256 def __init__(self, name, segment_names=[]): 1257 self.shared_memory_context_name = name 1258 self.segment_names = segment_names 1259 1260 def register_segment(self, segment_name): 1261 "Adds the supplied shared memory block name to tracker." 1262 util.debug(f"Register segment {segment_name!r} in pid {getpid()}") 1263 self.segment_names.append(segment_name) 1264 1265 def destroy_segment(self, segment_name): 1266 """Calls unlink() on the shared memory block with the supplied name 1267 and removes it from the list of blocks being tracked.""" 1268 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}") 1269 self.segment_names.remove(segment_name) 1270 segment = shared_memory.SharedMemory(segment_name) 1271 segment.close() 1272 segment.unlink() 1273 1274 def unlink(self): 1275 "Calls destroy_segment() on all tracked shared memory blocks." 1276 for segment_name in self.segment_names[:]: 1277 self.destroy_segment(segment_name) 1278 1279 def __del__(self): 1280 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}") 1281 self.unlink() 1282 1283 def __getstate__(self): 1284 return (self.shared_memory_context_name, self.segment_names) 1285 1286 def __setstate__(self, state): 1287 self.__init__(*state) 1288 1289 1290 class SharedMemoryServer(Server): 1291 1292 public = Server.public + \ 1293 ['track_segment', 'release_segment', 'list_segments'] 1294 1295 def __init__(self, *args, **kwargs): 1296 Server.__init__(self, *args, **kwargs) 1297 address = self.address 1298 # The address of Linux abstract namespaces can be bytes 1299 if isinstance(address, bytes): 1300 address = os.fsdecode(address) 1301 self.shared_memory_context = \ 1302 _SharedMemoryTracker(f"shm_{address}_{getpid()}") 1303 util.debug(f"SharedMemoryServer started by pid {getpid()}") 1304 1305 def create(self, c, typeid, /, *args, **kwargs): 1306 """Create a new distributed-shared object (not backed by a shared 1307 memory block) and return its id to be used in a Proxy Object.""" 1308 # Unless set up as a shared proxy, don't make shared_memory_context 1309 # a standard part of kwargs. This makes things easier for supplying 1310 # simple functions. 1311 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"): 1312 kwargs['shared_memory_context'] = self.shared_memory_context 1313 return Server.create(self, c, typeid, *args, **kwargs) 1314 1315 def shutdown(self, c): 1316 "Call unlink() on all tracked shared memory, terminate the Server." 1317 self.shared_memory_context.unlink() 1318 return Server.shutdown(self, c) 1319 1320 def track_segment(self, c, segment_name): 1321 "Adds the supplied shared memory block name to Server's tracker." 1322 self.shared_memory_context.register_segment(segment_name) 1323 1324 def release_segment(self, c, segment_name): 1325 """Calls unlink() on the shared memory block with the supplied name 1326 and removes it from the tracker instance inside the Server.""" 1327 self.shared_memory_context.destroy_segment(segment_name) 1328 1329 def list_segments(self, c): 1330 """Returns a list of names of shared memory blocks that the Server 1331 is currently tracking.""" 1332 return self.shared_memory_context.segment_names 1333 1334 1335 class SharedMemoryManager(BaseManager): 1336 """Like SyncManager but uses SharedMemoryServer instead of Server. 1337 1338 It provides methods for creating and returning SharedMemory instances 1339 and for creating a list-like object (ShareableList) backed by shared 1340 memory. It also provides methods that create and return Proxy Objects 1341 that support synchronization across processes (i.e. multi-process-safe 1342 locks and semaphores). 1343 """ 1344 1345 _Server = SharedMemoryServer 1346 1347 def __init__(self, *args, **kwargs): 1348 if os.name == "posix": 1349 # bpo-36867: Ensure the resource_tracker is running before 1350 # launching the manager process, so that concurrent 1351 # shared_memory manipulation both in the manager and in the 1352 # current process does not create two resource_tracker 1353 # processes. 1354 from . import resource_tracker 1355 resource_tracker.ensure_running() 1356 BaseManager.__init__(self, *args, **kwargs) 1357 util.debug(f"{self.__class__.__name__} created by pid {getpid()}") 1358 1359 def __del__(self): 1360 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}") 1361 1362 def get_server(self): 1363 'Better than monkeypatching for now; merge into Server ultimately' 1364 if self._state.value != State.INITIAL: 1365 if self._state.value == State.STARTED: 1366 raise ProcessError("Already started SharedMemoryServer") 1367 elif self._state.value == State.SHUTDOWN: 1368 raise ProcessError("SharedMemoryManager has shut down") 1369 else: 1370 raise ProcessError( 1371 "Unknown state {!r}".format(self._state.value)) 1372 return self._Server(self._registry, self._address, 1373 self._authkey, self._serializer) 1374 1375 def SharedMemory(self, size): 1376 """Returns a new SharedMemory instance with the specified size in 1377 bytes, to be tracked by the manager.""" 1378 with self._Client(self._address, authkey=self._authkey) as conn: 1379 sms = shared_memory.SharedMemory(None, create=True, size=size) 1380 try: 1381 dispatch(conn, None, 'track_segment', (sms.name,)) 1382 except BaseException as e: 1383 sms.unlink() 1384 raise e 1385 return sms 1386 1387 def ShareableList(self, sequence): 1388 """Returns a new ShareableList instance populated with the values 1389 from the input sequence, to be tracked by the manager.""" 1390 with self._Client(self._address, authkey=self._authkey) as conn: 1391 sl = shared_memory.ShareableList(sequence) 1392 try: 1393 dispatch(conn, None, 'track_segment', (sl.shm.name,)) 1394 except BaseException as e: 1395 sl.shm.unlink() 1396 raise e 1397 return sl 1398