1# 2# Module providing manager classes for dealing 3# with shared objects 4# 5# multiprocessing/managers.py 6# 7# Copyright (c) 2006-2008, R Oudkerk 8# Licensed to PSF under a Contributor Agreement. 9# 10 11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token', 12 'SharedMemoryManager' ] 13 14# 15# Imports 16# 17 18import sys 19import threading 20import signal 21import array 22import queue 23import time 24import types 25import os 26from os import getpid 27 28from traceback import format_exc 29 30from . import connection 31from .context import reduction, get_spawning_popen, ProcessError 32from . import pool 33from . import process 34from . import util 35from . import get_context 36try: 37 from . import shared_memory 38 HAS_SHMEM = True 39except ImportError: 40 HAS_SHMEM = False 41 42# 43# Register some things for pickling 44# 45 46def reduce_array(a): 47 return array.array, (a.typecode, a.tobytes()) 48reduction.register(array.array, reduce_array) 49 50view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] 51if view_types[0] is not list: # only needed in Py3.0 52 def rebuild_as_list(obj): 53 return list, (list(obj),) 54 for view_type in view_types: 55 reduction.register(view_type, rebuild_as_list) 56 57# 58# Type for identifying shared objects 59# 60 61class Token(object): 62 ''' 63 Type to uniquely identify a shared object 64 ''' 65 __slots__ = ('typeid', 'address', 'id') 66 67 def __init__(self, typeid, address, id): 68 (self.typeid, self.address, self.id) = (typeid, address, id) 69 70 def __getstate__(self): 71 return (self.typeid, self.address, self.id) 72 73 def __setstate__(self, state): 74 (self.typeid, self.address, self.id) = state 75 76 def __repr__(self): 77 return '%s(typeid=%r, address=%r, id=%r)' % \ 78 (self.__class__.__name__, self.typeid, self.address, self.id) 79 80# 81# Function for communication with a manager's server process 82# 83 84def dispatch(c, id, methodname, args=(), kwds={}): 85 ''' 86 Send a message to manager using connection `c` and return response 87 ''' 88 c.send((id, methodname, args, kwds)) 89 kind, result = c.recv() 90 if kind == '#RETURN': 91 return result 92 raise convert_to_error(kind, result) 93 94def convert_to_error(kind, result): 95 if kind == '#ERROR': 96 return result 97 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'): 98 if not isinstance(result, str): 99 raise TypeError( 100 "Result {0!r} (kind '{1}') type is {2}, not str".format( 101 result, kind, type(result))) 102 if kind == '#UNSERIALIZABLE': 103 return RemoteError('Unserializable message: %s\n' % result) 104 else: 105 return RemoteError(result) 106 else: 107 return ValueError('Unrecognized message type {!r}'.format(kind)) 108 109class RemoteError(Exception): 110 def __str__(self): 111 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) 112 113# 114# Functions for finding the method names of an object 115# 116 117def all_methods(obj): 118 ''' 119 Return a list of names of methods of `obj` 120 ''' 121 temp = [] 122 for name in dir(obj): 123 func = getattr(obj, name) 124 if callable(func): 125 temp.append(name) 126 return temp 127 128def public_methods(obj): 129 ''' 130 Return a list of names of methods of `obj` which do not start with '_' 131 ''' 132 return [name for name in all_methods(obj) if name[0] != '_'] 133 134# 135# Server which is run in a process controlled by a manager 136# 137 138class Server(object): 139 ''' 140 Server class which runs in a process controlled by a manager object 141 ''' 142 public = ['shutdown', 'create', 'accept_connection', 'get_methods', 143 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] 144 145 def __init__(self, registry, address, authkey, serializer): 146 if not isinstance(authkey, bytes): 147 raise TypeError( 148 "Authkey {0!r} is type {1!s}, not bytes".format( 149 authkey, type(authkey))) 150 self.registry = registry 151 self.authkey = process.AuthenticationString(authkey) 152 Listener, Client = listener_client[serializer] 153 154 # do authentication later 155 self.listener = Listener(address=address, backlog=16) 156 self.address = self.listener.address 157 158 self.id_to_obj = {'0': (None, ())} 159 self.id_to_refcount = {} 160 self.id_to_local_proxy_obj = {} 161 self.mutex = threading.Lock() 162 163 def serve_forever(self): 164 ''' 165 Run the server forever 166 ''' 167 self.stop_event = threading.Event() 168 process.current_process()._manager_server = self 169 try: 170 accepter = threading.Thread(target=self.accepter) 171 accepter.daemon = True 172 accepter.start() 173 try: 174 while not self.stop_event.is_set(): 175 self.stop_event.wait(1) 176 except (KeyboardInterrupt, SystemExit): 177 pass 178 finally: 179 if sys.stdout != sys.__stdout__: # what about stderr? 180 util.debug('resetting stdout, stderr') 181 sys.stdout = sys.__stdout__ 182 sys.stderr = sys.__stderr__ 183 sys.exit(0) 184 185 def accepter(self): 186 while True: 187 try: 188 c = self.listener.accept() 189 except OSError: 190 continue 191 t = threading.Thread(target=self.handle_request, args=(c,)) 192 t.daemon = True 193 t.start() 194 195 def handle_request(self, c): 196 ''' 197 Handle a new connection 198 ''' 199 funcname = result = request = None 200 try: 201 connection.deliver_challenge(c, self.authkey) 202 connection.answer_challenge(c, self.authkey) 203 request = c.recv() 204 ignore, funcname, args, kwds = request 205 assert funcname in self.public, '%r unrecognized' % funcname 206 func = getattr(self, funcname) 207 except Exception: 208 msg = ('#TRACEBACK', format_exc()) 209 else: 210 try: 211 result = func(c, *args, **kwds) 212 except Exception: 213 msg = ('#TRACEBACK', format_exc()) 214 else: 215 msg = ('#RETURN', result) 216 try: 217 c.send(msg) 218 except Exception as e: 219 try: 220 c.send(('#TRACEBACK', format_exc())) 221 except Exception: 222 pass 223 util.info('Failure to send message: %r', msg) 224 util.info(' ... request was %r', request) 225 util.info(' ... exception was %r', e) 226 227 c.close() 228 229 def serve_client(self, conn): 230 ''' 231 Handle requests from the proxies in a particular process/thread 232 ''' 233 util.debug('starting server thread to service %r', 234 threading.current_thread().name) 235 236 recv = conn.recv 237 send = conn.send 238 id_to_obj = self.id_to_obj 239 240 while not self.stop_event.is_set(): 241 242 try: 243 methodname = obj = None 244 request = recv() 245 ident, methodname, args, kwds = request 246 try: 247 obj, exposed, gettypeid = id_to_obj[ident] 248 except KeyError as ke: 249 try: 250 obj, exposed, gettypeid = \ 251 self.id_to_local_proxy_obj[ident] 252 except KeyError: 253 raise ke 254 255 if methodname not in exposed: 256 raise AttributeError( 257 'method %r of %r object is not in exposed=%r' % 258 (methodname, type(obj), exposed) 259 ) 260 261 function = getattr(obj, methodname) 262 263 try: 264 res = function(*args, **kwds) 265 except Exception as e: 266 msg = ('#ERROR', e) 267 else: 268 typeid = gettypeid and gettypeid.get(methodname, None) 269 if typeid: 270 rident, rexposed = self.create(conn, typeid, res) 271 token = Token(typeid, self.address, rident) 272 msg = ('#PROXY', (rexposed, token)) 273 else: 274 msg = ('#RETURN', res) 275 276 except AttributeError: 277 if methodname is None: 278 msg = ('#TRACEBACK', format_exc()) 279 else: 280 try: 281 fallback_func = self.fallback_mapping[methodname] 282 result = fallback_func( 283 self, conn, ident, obj, *args, **kwds 284 ) 285 msg = ('#RETURN', result) 286 except Exception: 287 msg = ('#TRACEBACK', format_exc()) 288 289 except EOFError: 290 util.debug('got EOF -- exiting thread serving %r', 291 threading.current_thread().name) 292 sys.exit(0) 293 294 except Exception: 295 msg = ('#TRACEBACK', format_exc()) 296 297 try: 298 try: 299 send(msg) 300 except Exception: 301 send(('#UNSERIALIZABLE', format_exc())) 302 except Exception as e: 303 util.info('exception in thread serving %r', 304 threading.current_thread().name) 305 util.info(' ... message was %r', msg) 306 util.info(' ... exception was %r', e) 307 conn.close() 308 sys.exit(1) 309 310 def fallback_getvalue(self, conn, ident, obj): 311 return obj 312 313 def fallback_str(self, conn, ident, obj): 314 return str(obj) 315 316 def fallback_repr(self, conn, ident, obj): 317 return repr(obj) 318 319 fallback_mapping = { 320 '__str__':fallback_str, 321 '__repr__':fallback_repr, 322 '#GETVALUE':fallback_getvalue 323 } 324 325 def dummy(self, c): 326 pass 327 328 def debug_info(self, c): 329 ''' 330 Return some info --- useful to spot problems with refcounting 331 ''' 332 # Perhaps include debug info about 'c'? 333 with self.mutex: 334 result = [] 335 keys = list(self.id_to_refcount.keys()) 336 keys.sort() 337 for ident in keys: 338 if ident != '0': 339 result.append(' %s: refcount=%s\n %s' % 340 (ident, self.id_to_refcount[ident], 341 str(self.id_to_obj[ident][0])[:75])) 342 return '\n'.join(result) 343 344 def number_of_objects(self, c): 345 ''' 346 Number of shared objects 347 ''' 348 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0' 349 return len(self.id_to_refcount) 350 351 def shutdown(self, c): 352 ''' 353 Shutdown this process 354 ''' 355 try: 356 util.debug('manager received shutdown message') 357 c.send(('#RETURN', None)) 358 except: 359 import traceback 360 traceback.print_exc() 361 finally: 362 self.stop_event.set() 363 364 def create(self, c, typeid, /, *args, **kwds): 365 ''' 366 Create a new shared object and return its id 367 ''' 368 with self.mutex: 369 callable, exposed, method_to_typeid, proxytype = \ 370 self.registry[typeid] 371 372 if callable is None: 373 if kwds or (len(args) != 1): 374 raise ValueError( 375 "Without callable, must have one non-keyword argument") 376 obj = args[0] 377 else: 378 obj = callable(*args, **kwds) 379 380 if exposed is None: 381 exposed = public_methods(obj) 382 if method_to_typeid is not None: 383 if not isinstance(method_to_typeid, dict): 384 raise TypeError( 385 "Method_to_typeid {0!r}: type {1!s}, not dict".format( 386 method_to_typeid, type(method_to_typeid))) 387 exposed = list(exposed) + list(method_to_typeid) 388 389 ident = '%x' % id(obj) # convert to string because xmlrpclib 390 # only has 32 bit signed integers 391 util.debug('%r callable returned object with id %r', typeid, ident) 392 393 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) 394 if ident not in self.id_to_refcount: 395 self.id_to_refcount[ident] = 0 396 397 self.incref(c, ident) 398 return ident, tuple(exposed) 399 400 def get_methods(self, c, token): 401 ''' 402 Return the methods of the shared object indicated by token 403 ''' 404 return tuple(self.id_to_obj[token.id][1]) 405 406 def accept_connection(self, c, name): 407 ''' 408 Spawn a new thread to serve this connection 409 ''' 410 threading.current_thread().name = name 411 c.send(('#RETURN', None)) 412 self.serve_client(c) 413 414 def incref(self, c, ident): 415 with self.mutex: 416 try: 417 self.id_to_refcount[ident] += 1 418 except KeyError as ke: 419 # If no external references exist but an internal (to the 420 # manager) still does and a new external reference is created 421 # from it, restore the manager's tracking of it from the 422 # previously stashed internal ref. 423 if ident in self.id_to_local_proxy_obj: 424 self.id_to_refcount[ident] = 1 425 self.id_to_obj[ident] = \ 426 self.id_to_local_proxy_obj[ident] 427 obj, exposed, gettypeid = self.id_to_obj[ident] 428 util.debug('Server re-enabled tracking & INCREF %r', ident) 429 else: 430 raise ke 431 432 def decref(self, c, ident): 433 if ident not in self.id_to_refcount and \ 434 ident in self.id_to_local_proxy_obj: 435 util.debug('Server DECREF skipping %r', ident) 436 return 437 438 with self.mutex: 439 if self.id_to_refcount[ident] <= 0: 440 raise AssertionError( 441 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format( 442 ident, self.id_to_obj[ident], 443 self.id_to_refcount[ident])) 444 self.id_to_refcount[ident] -= 1 445 if self.id_to_refcount[ident] == 0: 446 del self.id_to_refcount[ident] 447 448 if ident not in self.id_to_refcount: 449 # Two-step process in case the object turns out to contain other 450 # proxy objects (e.g. a managed list of managed lists). 451 # Otherwise, deleting self.id_to_obj[ident] would trigger the 452 # deleting of the stored value (another managed object) which would 453 # in turn attempt to acquire the mutex that is already held here. 454 self.id_to_obj[ident] = (None, (), None) # thread-safe 455 util.debug('disposing of obj with id %r', ident) 456 with self.mutex: 457 del self.id_to_obj[ident] 458 459 460# 461# Class to represent state of a manager 462# 463 464class State(object): 465 __slots__ = ['value'] 466 INITIAL = 0 467 STARTED = 1 468 SHUTDOWN = 2 469 470# 471# Mapping from serializer name to Listener and Client types 472# 473 474listener_client = { 475 'pickle' : (connection.Listener, connection.Client), 476 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) 477 } 478 479# 480# Definition of BaseManager 481# 482 483class BaseManager(object): 484 ''' 485 Base class for managers 486 ''' 487 _registry = {} 488 _Server = Server 489 490 def __init__(self, address=None, authkey=None, serializer='pickle', 491 ctx=None): 492 if authkey is None: 493 authkey = process.current_process().authkey 494 self._address = address # XXX not final address if eg ('', 0) 495 self._authkey = process.AuthenticationString(authkey) 496 self._state = State() 497 self._state.value = State.INITIAL 498 self._serializer = serializer 499 self._Listener, self._Client = listener_client[serializer] 500 self._ctx = ctx or get_context() 501 502 def get_server(self): 503 ''' 504 Return server object with serve_forever() method and address attribute 505 ''' 506 if self._state.value != State.INITIAL: 507 if self._state.value == State.STARTED: 508 raise ProcessError("Already started server") 509 elif self._state.value == State.SHUTDOWN: 510 raise ProcessError("Manager has shut down") 511 else: 512 raise ProcessError( 513 "Unknown state {!r}".format(self._state.value)) 514 return Server(self._registry, self._address, 515 self._authkey, self._serializer) 516 517 def connect(self): 518 ''' 519 Connect manager object to the server process 520 ''' 521 Listener, Client = listener_client[self._serializer] 522 conn = Client(self._address, authkey=self._authkey) 523 dispatch(conn, None, 'dummy') 524 self._state.value = State.STARTED 525 526 def start(self, initializer=None, initargs=()): 527 ''' 528 Spawn a server process for this manager object 529 ''' 530 if self._state.value != State.INITIAL: 531 if self._state.value == State.STARTED: 532 raise ProcessError("Already started server") 533 elif self._state.value == State.SHUTDOWN: 534 raise ProcessError("Manager has shut down") 535 else: 536 raise ProcessError( 537 "Unknown state {!r}".format(self._state.value)) 538 539 if initializer is not None and not callable(initializer): 540 raise TypeError('initializer must be a callable') 541 542 # pipe over which we will retrieve address of server 543 reader, writer = connection.Pipe(duplex=False) 544 545 # spawn process which runs a server 546 self._process = self._ctx.Process( 547 target=type(self)._run_server, 548 args=(self._registry, self._address, self._authkey, 549 self._serializer, writer, initializer, initargs), 550 ) 551 ident = ':'.join(str(i) for i in self._process._identity) 552 self._process.name = type(self).__name__ + '-' + ident 553 self._process.start() 554 555 # get address of server 556 writer.close() 557 self._address = reader.recv() 558 reader.close() 559 560 # register a finalizer 561 self._state.value = State.STARTED 562 self.shutdown = util.Finalize( 563 self, type(self)._finalize_manager, 564 args=(self._process, self._address, self._authkey, 565 self._state, self._Client), 566 exitpriority=0 567 ) 568 569 @classmethod 570 def _run_server(cls, registry, address, authkey, serializer, writer, 571 initializer=None, initargs=()): 572 ''' 573 Create a server, report its address and run it 574 ''' 575 # bpo-36368: protect server process from KeyboardInterrupt signals 576 signal.signal(signal.SIGINT, signal.SIG_IGN) 577 578 if initializer is not None: 579 initializer(*initargs) 580 581 # create server 582 server = cls._Server(registry, address, authkey, serializer) 583 584 # inform parent process of the server's address 585 writer.send(server.address) 586 writer.close() 587 588 # run the manager 589 util.info('manager serving at %r', server.address) 590 server.serve_forever() 591 592 def _create(self, typeid, /, *args, **kwds): 593 ''' 594 Create a new shared object; return the token and exposed tuple 595 ''' 596 assert self._state.value == State.STARTED, 'server not yet started' 597 conn = self._Client(self._address, authkey=self._authkey) 598 try: 599 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) 600 finally: 601 conn.close() 602 return Token(typeid, self._address, id), exposed 603 604 def join(self, timeout=None): 605 ''' 606 Join the manager process (if it has been spawned) 607 ''' 608 if self._process is not None: 609 self._process.join(timeout) 610 if not self._process.is_alive(): 611 self._process = None 612 613 def _debug_info(self): 614 ''' 615 Return some info about the servers shared objects and connections 616 ''' 617 conn = self._Client(self._address, authkey=self._authkey) 618 try: 619 return dispatch(conn, None, 'debug_info') 620 finally: 621 conn.close() 622 623 def _number_of_objects(self): 624 ''' 625 Return the number of shared objects 626 ''' 627 conn = self._Client(self._address, authkey=self._authkey) 628 try: 629 return dispatch(conn, None, 'number_of_objects') 630 finally: 631 conn.close() 632 633 def __enter__(self): 634 if self._state.value == State.INITIAL: 635 self.start() 636 if self._state.value != State.STARTED: 637 if self._state.value == State.INITIAL: 638 raise ProcessError("Unable to start server") 639 elif self._state.value == State.SHUTDOWN: 640 raise ProcessError("Manager has shut down") 641 else: 642 raise ProcessError( 643 "Unknown state {!r}".format(self._state.value)) 644 return self 645 646 def __exit__(self, exc_type, exc_val, exc_tb): 647 self.shutdown() 648 649 @staticmethod 650 def _finalize_manager(process, address, authkey, state, _Client): 651 ''' 652 Shutdown the manager process; will be registered as a finalizer 653 ''' 654 if process.is_alive(): 655 util.info('sending shutdown message to manager') 656 try: 657 conn = _Client(address, authkey=authkey) 658 try: 659 dispatch(conn, None, 'shutdown') 660 finally: 661 conn.close() 662 except Exception: 663 pass 664 665 process.join(timeout=1.0) 666 if process.is_alive(): 667 util.info('manager still alive') 668 if hasattr(process, 'terminate'): 669 util.info('trying to `terminate()` manager process') 670 process.terminate() 671 process.join(timeout=0.1) 672 if process.is_alive(): 673 util.info('manager still alive after terminate') 674 675 state.value = State.SHUTDOWN 676 try: 677 del BaseProxy._address_to_local[address] 678 except KeyError: 679 pass 680 681 @property 682 def address(self): 683 return self._address 684 685 @classmethod 686 def register(cls, typeid, callable=None, proxytype=None, exposed=None, 687 method_to_typeid=None, create_method=True): 688 ''' 689 Register a typeid with the manager type 690 ''' 691 if '_registry' not in cls.__dict__: 692 cls._registry = cls._registry.copy() 693 694 if proxytype is None: 695 proxytype = AutoProxy 696 697 exposed = exposed or getattr(proxytype, '_exposed_', None) 698 699 method_to_typeid = method_to_typeid or \ 700 getattr(proxytype, '_method_to_typeid_', None) 701 702 if method_to_typeid: 703 for key, value in list(method_to_typeid.items()): # isinstance? 704 assert type(key) is str, '%r is not a string' % key 705 assert type(value) is str, '%r is not a string' % value 706 707 cls._registry[typeid] = ( 708 callable, exposed, method_to_typeid, proxytype 709 ) 710 711 if create_method: 712 def temp(self, /, *args, **kwds): 713 util.debug('requesting creation of a shared %r object', typeid) 714 token, exp = self._create(typeid, *args, **kwds) 715 proxy = proxytype( 716 token, self._serializer, manager=self, 717 authkey=self._authkey, exposed=exp 718 ) 719 conn = self._Client(token.address, authkey=self._authkey) 720 dispatch(conn, None, 'decref', (token.id,)) 721 return proxy 722 temp.__name__ = typeid 723 setattr(cls, typeid, temp) 724 725# 726# Subclass of set which get cleared after a fork 727# 728 729class ProcessLocalSet(set): 730 def __init__(self): 731 util.register_after_fork(self, lambda obj: obj.clear()) 732 def __reduce__(self): 733 return type(self), () 734 735# 736# Definition of BaseProxy 737# 738 739class BaseProxy(object): 740 ''' 741 A base for proxies of shared objects 742 ''' 743 _address_to_local = {} 744 _mutex = util.ForkAwareThreadLock() 745 746 def __init__(self, token, serializer, manager=None, 747 authkey=None, exposed=None, incref=True, manager_owned=False): 748 with BaseProxy._mutex: 749 tls_idset = BaseProxy._address_to_local.get(token.address, None) 750 if tls_idset is None: 751 tls_idset = util.ForkAwareLocal(), ProcessLocalSet() 752 BaseProxy._address_to_local[token.address] = tls_idset 753 754 # self._tls is used to record the connection used by this 755 # thread to communicate with the manager at token.address 756 self._tls = tls_idset[0] 757 758 # self._idset is used to record the identities of all shared 759 # objects for which the current process owns references and 760 # which are in the manager at token.address 761 self._idset = tls_idset[1] 762 763 self._token = token 764 self._id = self._token.id 765 self._manager = manager 766 self._serializer = serializer 767 self._Client = listener_client[serializer][1] 768 769 # Should be set to True only when a proxy object is being created 770 # on the manager server; primary use case: nested proxy objects. 771 # RebuildProxy detects when a proxy is being created on the manager 772 # and sets this value appropriately. 773 self._owned_by_manager = manager_owned 774 775 if authkey is not None: 776 self._authkey = process.AuthenticationString(authkey) 777 elif self._manager is not None: 778 self._authkey = self._manager._authkey 779 else: 780 self._authkey = process.current_process().authkey 781 782 if incref: 783 self._incref() 784 785 util.register_after_fork(self, BaseProxy._after_fork) 786 787 def _connect(self): 788 util.debug('making connection to manager') 789 name = process.current_process().name 790 if threading.current_thread().name != 'MainThread': 791 name += '|' + threading.current_thread().name 792 conn = self._Client(self._token.address, authkey=self._authkey) 793 dispatch(conn, None, 'accept_connection', (name,)) 794 self._tls.connection = conn 795 796 def _callmethod(self, methodname, args=(), kwds={}): 797 ''' 798 Try to call a method of the referent and return a copy of the result 799 ''' 800 try: 801 conn = self._tls.connection 802 except AttributeError: 803 util.debug('thread %r does not own a connection', 804 threading.current_thread().name) 805 self._connect() 806 conn = self._tls.connection 807 808 conn.send((self._id, methodname, args, kwds)) 809 kind, result = conn.recv() 810 811 if kind == '#RETURN': 812 return result 813 elif kind == '#PROXY': 814 exposed, token = result 815 proxytype = self._manager._registry[token.typeid][-1] 816 token.address = self._token.address 817 proxy = proxytype( 818 token, self._serializer, manager=self._manager, 819 authkey=self._authkey, exposed=exposed 820 ) 821 conn = self._Client(token.address, authkey=self._authkey) 822 dispatch(conn, None, 'decref', (token.id,)) 823 return proxy 824 raise convert_to_error(kind, result) 825 826 def _getvalue(self): 827 ''' 828 Get a copy of the value of the referent 829 ''' 830 return self._callmethod('#GETVALUE') 831 832 def _incref(self): 833 if self._owned_by_manager: 834 util.debug('owned_by_manager skipped INCREF of %r', self._token.id) 835 return 836 837 conn = self._Client(self._token.address, authkey=self._authkey) 838 dispatch(conn, None, 'incref', (self._id,)) 839 util.debug('INCREF %r', self._token.id) 840 841 self._idset.add(self._id) 842 843 state = self._manager and self._manager._state 844 845 self._close = util.Finalize( 846 self, BaseProxy._decref, 847 args=(self._token, self._authkey, state, 848 self._tls, self._idset, self._Client), 849 exitpriority=10 850 ) 851 852 @staticmethod 853 def _decref(token, authkey, state, tls, idset, _Client): 854 idset.discard(token.id) 855 856 # check whether manager is still alive 857 if state is None or state.value == State.STARTED: 858 # tell manager this process no longer cares about referent 859 try: 860 util.debug('DECREF %r', token.id) 861 conn = _Client(token.address, authkey=authkey) 862 dispatch(conn, None, 'decref', (token.id,)) 863 except Exception as e: 864 util.debug('... decref failed %s', e) 865 866 else: 867 util.debug('DECREF %r -- manager already shutdown', token.id) 868 869 # check whether we can close this thread's connection because 870 # the process owns no more references to objects for this manager 871 if not idset and hasattr(tls, 'connection'): 872 util.debug('thread %r has no more proxies so closing conn', 873 threading.current_thread().name) 874 tls.connection.close() 875 del tls.connection 876 877 def _after_fork(self): 878 self._manager = None 879 try: 880 self._incref() 881 except Exception as e: 882 # the proxy may just be for a manager which has shutdown 883 util.info('incref failed: %s' % e) 884 885 def __reduce__(self): 886 kwds = {} 887 if get_spawning_popen() is not None: 888 kwds['authkey'] = self._authkey 889 890 if getattr(self, '_isauto', False): 891 kwds['exposed'] = self._exposed_ 892 return (RebuildProxy, 893 (AutoProxy, self._token, self._serializer, kwds)) 894 else: 895 return (RebuildProxy, 896 (type(self), self._token, self._serializer, kwds)) 897 898 def __deepcopy__(self, memo): 899 return self._getvalue() 900 901 def __repr__(self): 902 return '<%s object, typeid %r at %#x>' % \ 903 (type(self).__name__, self._token.typeid, id(self)) 904 905 def __str__(self): 906 ''' 907 Return representation of the referent (or a fall-back if that fails) 908 ''' 909 try: 910 return self._callmethod('__repr__') 911 except Exception: 912 return repr(self)[:-1] + "; '__str__()' failed>" 913 914# 915# Function used for unpickling 916# 917 918def RebuildProxy(func, token, serializer, kwds): 919 ''' 920 Function used for unpickling proxy objects. 921 ''' 922 server = getattr(process.current_process(), '_manager_server', None) 923 if server and server.address == token.address: 924 util.debug('Rebuild a proxy owned by manager, token=%r', token) 925 kwds['manager_owned'] = True 926 if token.id not in server.id_to_local_proxy_obj: 927 server.id_to_local_proxy_obj[token.id] = \ 928 server.id_to_obj[token.id] 929 incref = ( 930 kwds.pop('incref', True) and 931 not getattr(process.current_process(), '_inheriting', False) 932 ) 933 return func(token, serializer, incref=incref, **kwds) 934 935# 936# Functions to create proxies and proxy types 937# 938 939def MakeProxyType(name, exposed, _cache={}): 940 ''' 941 Return a proxy type whose methods are given by `exposed` 942 ''' 943 exposed = tuple(exposed) 944 try: 945 return _cache[(name, exposed)] 946 except KeyError: 947 pass 948 949 dic = {} 950 951 for meth in exposed: 952 exec('''def %s(self, /, *args, **kwds): 953 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) 954 955 ProxyType = type(name, (BaseProxy,), dic) 956 ProxyType._exposed_ = exposed 957 _cache[(name, exposed)] = ProxyType 958 return ProxyType 959 960 961def AutoProxy(token, serializer, manager=None, authkey=None, 962 exposed=None, incref=True): 963 ''' 964 Return an auto-proxy for `token` 965 ''' 966 _Client = listener_client[serializer][1] 967 968 if exposed is None: 969 conn = _Client(token.address, authkey=authkey) 970 try: 971 exposed = dispatch(conn, None, 'get_methods', (token,)) 972 finally: 973 conn.close() 974 975 if authkey is None and manager is not None: 976 authkey = manager._authkey 977 if authkey is None: 978 authkey = process.current_process().authkey 979 980 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) 981 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, 982 incref=incref) 983 proxy._isauto = True 984 return proxy 985 986# 987# Types/callables which we will register with SyncManager 988# 989 990class Namespace(object): 991 def __init__(self, /, **kwds): 992 self.__dict__.update(kwds) 993 def __repr__(self): 994 items = list(self.__dict__.items()) 995 temp = [] 996 for name, value in items: 997 if not name.startswith('_'): 998 temp.append('%s=%r' % (name, value)) 999 temp.sort() 1000 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp)) 1001 1002class Value(object): 1003 def __init__(self, typecode, value, lock=True): 1004 self._typecode = typecode 1005 self._value = value 1006 def get(self): 1007 return self._value 1008 def set(self, value): 1009 self._value = value 1010 def __repr__(self): 1011 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) 1012 value = property(get, set) 1013 1014def Array(typecode, sequence, lock=True): 1015 return array.array(typecode, sequence) 1016 1017# 1018# Proxy types used by SyncManager 1019# 1020 1021class IteratorProxy(BaseProxy): 1022 _exposed_ = ('__next__', 'send', 'throw', 'close') 1023 def __iter__(self): 1024 return self 1025 def __next__(self, *args): 1026 return self._callmethod('__next__', args) 1027 def send(self, *args): 1028 return self._callmethod('send', args) 1029 def throw(self, *args): 1030 return self._callmethod('throw', args) 1031 def close(self, *args): 1032 return self._callmethod('close', args) 1033 1034 1035class AcquirerProxy(BaseProxy): 1036 _exposed_ = ('acquire', 'release') 1037 def acquire(self, blocking=True, timeout=None): 1038 args = (blocking,) if timeout is None else (blocking, timeout) 1039 return self._callmethod('acquire', args) 1040 def release(self): 1041 return self._callmethod('release') 1042 def __enter__(self): 1043 return self._callmethod('acquire') 1044 def __exit__(self, exc_type, exc_val, exc_tb): 1045 return self._callmethod('release') 1046 1047 1048class ConditionProxy(AcquirerProxy): 1049 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') 1050 def wait(self, timeout=None): 1051 return self._callmethod('wait', (timeout,)) 1052 def notify(self, n=1): 1053 return self._callmethod('notify', (n,)) 1054 def notify_all(self): 1055 return self._callmethod('notify_all') 1056 def wait_for(self, predicate, timeout=None): 1057 result = predicate() 1058 if result: 1059 return result 1060 if timeout is not None: 1061 endtime = time.monotonic() + timeout 1062 else: 1063 endtime = None 1064 waittime = None 1065 while not result: 1066 if endtime is not None: 1067 waittime = endtime - time.monotonic() 1068 if waittime <= 0: 1069 break 1070 self.wait(waittime) 1071 result = predicate() 1072 return result 1073 1074 1075class EventProxy(BaseProxy): 1076 _exposed_ = ('is_set', 'set', 'clear', 'wait') 1077 def is_set(self): 1078 return self._callmethod('is_set') 1079 def set(self): 1080 return self._callmethod('set') 1081 def clear(self): 1082 return self._callmethod('clear') 1083 def wait(self, timeout=None): 1084 return self._callmethod('wait', (timeout,)) 1085 1086 1087class BarrierProxy(BaseProxy): 1088 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset') 1089 def wait(self, timeout=None): 1090 return self._callmethod('wait', (timeout,)) 1091 def abort(self): 1092 return self._callmethod('abort') 1093 def reset(self): 1094 return self._callmethod('reset') 1095 @property 1096 def parties(self): 1097 return self._callmethod('__getattribute__', ('parties',)) 1098 @property 1099 def n_waiting(self): 1100 return self._callmethod('__getattribute__', ('n_waiting',)) 1101 @property 1102 def broken(self): 1103 return self._callmethod('__getattribute__', ('broken',)) 1104 1105 1106class NamespaceProxy(BaseProxy): 1107 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') 1108 def __getattr__(self, key): 1109 if key[0] == '_': 1110 return object.__getattribute__(self, key) 1111 callmethod = object.__getattribute__(self, '_callmethod') 1112 return callmethod('__getattribute__', (key,)) 1113 def __setattr__(self, key, value): 1114 if key[0] == '_': 1115 return object.__setattr__(self, key, value) 1116 callmethod = object.__getattribute__(self, '_callmethod') 1117 return callmethod('__setattr__', (key, value)) 1118 def __delattr__(self, key): 1119 if key[0] == '_': 1120 return object.__delattr__(self, key) 1121 callmethod = object.__getattribute__(self, '_callmethod') 1122 return callmethod('__delattr__', (key,)) 1123 1124 1125class ValueProxy(BaseProxy): 1126 _exposed_ = ('get', 'set') 1127 def get(self): 1128 return self._callmethod('get') 1129 def set(self, value): 1130 return self._callmethod('set', (value,)) 1131 value = property(get, set) 1132 1133 __class_getitem__ = classmethod(types.GenericAlias) 1134 1135 1136BaseListProxy = MakeProxyType('BaseListProxy', ( 1137 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', 1138 '__mul__', '__reversed__', '__rmul__', '__setitem__', 1139 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 1140 'reverse', 'sort', '__imul__' 1141 )) 1142class ListProxy(BaseListProxy): 1143 def __iadd__(self, value): 1144 self._callmethod('extend', (value,)) 1145 return self 1146 def __imul__(self, value): 1147 self._callmethod('__imul__', (value,)) 1148 return self 1149 1150 1151DictProxy = MakeProxyType('DictProxy', ( 1152 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__', 1153 '__setitem__', 'clear', 'copy', 'get', 'items', 1154 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' 1155 )) 1156DictProxy._method_to_typeid_ = { 1157 '__iter__': 'Iterator', 1158 } 1159 1160 1161ArrayProxy = MakeProxyType('ArrayProxy', ( 1162 '__len__', '__getitem__', '__setitem__' 1163 )) 1164 1165 1166BasePoolProxy = MakeProxyType('PoolProxy', ( 1167 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 1168 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', 1169 )) 1170BasePoolProxy._method_to_typeid_ = { 1171 'apply_async': 'AsyncResult', 1172 'map_async': 'AsyncResult', 1173 'starmap_async': 'AsyncResult', 1174 'imap': 'Iterator', 1175 'imap_unordered': 'Iterator' 1176 } 1177class PoolProxy(BasePoolProxy): 1178 def __enter__(self): 1179 return self 1180 def __exit__(self, exc_type, exc_val, exc_tb): 1181 self.terminate() 1182 1183# 1184# Definition of SyncManager 1185# 1186 1187class SyncManager(BaseManager): 1188 ''' 1189 Subclass of `BaseManager` which supports a number of shared object types. 1190 1191 The types registered are those intended for the synchronization 1192 of threads, plus `dict`, `list` and `Namespace`. 1193 1194 The `multiprocessing.Manager()` function creates started instances of 1195 this class. 1196 ''' 1197 1198SyncManager.register('Queue', queue.Queue) 1199SyncManager.register('JoinableQueue', queue.Queue) 1200SyncManager.register('Event', threading.Event, EventProxy) 1201SyncManager.register('Lock', threading.Lock, AcquirerProxy) 1202SyncManager.register('RLock', threading.RLock, AcquirerProxy) 1203SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) 1204SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, 1205 AcquirerProxy) 1206SyncManager.register('Condition', threading.Condition, ConditionProxy) 1207SyncManager.register('Barrier', threading.Barrier, BarrierProxy) 1208SyncManager.register('Pool', pool.Pool, PoolProxy) 1209SyncManager.register('list', list, ListProxy) 1210SyncManager.register('dict', dict, DictProxy) 1211SyncManager.register('Value', Value, ValueProxy) 1212SyncManager.register('Array', Array, ArrayProxy) 1213SyncManager.register('Namespace', Namespace, NamespaceProxy) 1214 1215# types returned by methods of PoolProxy 1216SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) 1217SyncManager.register('AsyncResult', create_method=False) 1218 1219# 1220# Definition of SharedMemoryManager and SharedMemoryServer 1221# 1222 1223if HAS_SHMEM: 1224 class _SharedMemoryTracker: 1225 "Manages one or more shared memory segments." 1226 1227 def __init__(self, name, segment_names=[]): 1228 self.shared_memory_context_name = name 1229 self.segment_names = segment_names 1230 1231 def register_segment(self, segment_name): 1232 "Adds the supplied shared memory block name to tracker." 1233 util.debug(f"Register segment {segment_name!r} in pid {getpid()}") 1234 self.segment_names.append(segment_name) 1235 1236 def destroy_segment(self, segment_name): 1237 """Calls unlink() on the shared memory block with the supplied name 1238 and removes it from the list of blocks being tracked.""" 1239 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}") 1240 self.segment_names.remove(segment_name) 1241 segment = shared_memory.SharedMemory(segment_name) 1242 segment.close() 1243 segment.unlink() 1244 1245 def unlink(self): 1246 "Calls destroy_segment() on all tracked shared memory blocks." 1247 for segment_name in self.segment_names[:]: 1248 self.destroy_segment(segment_name) 1249 1250 def __del__(self): 1251 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}") 1252 self.unlink() 1253 1254 def __getstate__(self): 1255 return (self.shared_memory_context_name, self.segment_names) 1256 1257 def __setstate__(self, state): 1258 self.__init__(*state) 1259 1260 1261 class SharedMemoryServer(Server): 1262 1263 public = Server.public + \ 1264 ['track_segment', 'release_segment', 'list_segments'] 1265 1266 def __init__(self, *args, **kwargs): 1267 Server.__init__(self, *args, **kwargs) 1268 address = self.address 1269 # The address of Linux abstract namespaces can be bytes 1270 if isinstance(address, bytes): 1271 address = os.fsdecode(address) 1272 self.shared_memory_context = \ 1273 _SharedMemoryTracker(f"shm_{address}_{getpid()}") 1274 util.debug(f"SharedMemoryServer started by pid {getpid()}") 1275 1276 def create(self, c, typeid, /, *args, **kwargs): 1277 """Create a new distributed-shared object (not backed by a shared 1278 memory block) and return its id to be used in a Proxy Object.""" 1279 # Unless set up as a shared proxy, don't make shared_memory_context 1280 # a standard part of kwargs. This makes things easier for supplying 1281 # simple functions. 1282 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"): 1283 kwargs['shared_memory_context'] = self.shared_memory_context 1284 return Server.create(self, c, typeid, *args, **kwargs) 1285 1286 def shutdown(self, c): 1287 "Call unlink() on all tracked shared memory, terminate the Server." 1288 self.shared_memory_context.unlink() 1289 return Server.shutdown(self, c) 1290 1291 def track_segment(self, c, segment_name): 1292 "Adds the supplied shared memory block name to Server's tracker." 1293 self.shared_memory_context.register_segment(segment_name) 1294 1295 def release_segment(self, c, segment_name): 1296 """Calls unlink() on the shared memory block with the supplied name 1297 and removes it from the tracker instance inside the Server.""" 1298 self.shared_memory_context.destroy_segment(segment_name) 1299 1300 def list_segments(self, c): 1301 """Returns a list of names of shared memory blocks that the Server 1302 is currently tracking.""" 1303 return self.shared_memory_context.segment_names 1304 1305 1306 class SharedMemoryManager(BaseManager): 1307 """Like SyncManager but uses SharedMemoryServer instead of Server. 1308 1309 It provides methods for creating and returning SharedMemory instances 1310 and for creating a list-like object (ShareableList) backed by shared 1311 memory. It also provides methods that create and return Proxy Objects 1312 that support synchronization across processes (i.e. multi-process-safe 1313 locks and semaphores). 1314 """ 1315 1316 _Server = SharedMemoryServer 1317 1318 def __init__(self, *args, **kwargs): 1319 if os.name == "posix": 1320 # bpo-36867: Ensure the resource_tracker is running before 1321 # launching the manager process, so that concurrent 1322 # shared_memory manipulation both in the manager and in the 1323 # current process does not create two resource_tracker 1324 # processes. 1325 from . import resource_tracker 1326 resource_tracker.ensure_running() 1327 BaseManager.__init__(self, *args, **kwargs) 1328 util.debug(f"{self.__class__.__name__} created by pid {getpid()}") 1329 1330 def __del__(self): 1331 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}") 1332 pass 1333 1334 def get_server(self): 1335 'Better than monkeypatching for now; merge into Server ultimately' 1336 if self._state.value != State.INITIAL: 1337 if self._state.value == State.STARTED: 1338 raise ProcessError("Already started SharedMemoryServer") 1339 elif self._state.value == State.SHUTDOWN: 1340 raise ProcessError("SharedMemoryManager has shut down") 1341 else: 1342 raise ProcessError( 1343 "Unknown state {!r}".format(self._state.value)) 1344 return self._Server(self._registry, self._address, 1345 self._authkey, self._serializer) 1346 1347 def SharedMemory(self, size): 1348 """Returns a new SharedMemory instance with the specified size in 1349 bytes, to be tracked by the manager.""" 1350 with self._Client(self._address, authkey=self._authkey) as conn: 1351 sms = shared_memory.SharedMemory(None, create=True, size=size) 1352 try: 1353 dispatch(conn, None, 'track_segment', (sms.name,)) 1354 except BaseException as e: 1355 sms.unlink() 1356 raise e 1357 return sms 1358 1359 def ShareableList(self, sequence): 1360 """Returns a new ShareableList instance populated with the values 1361 from the input sequence, to be tracked by the manager.""" 1362 with self._Client(self._address, authkey=self._authkey) as conn: 1363 sl = shared_memory.ShareableList(sequence) 1364 try: 1365 dispatch(conn, None, 'track_segment', (sl.shm.name,)) 1366 except BaseException as e: 1367 sl.shm.unlink() 1368 raise e 1369 return sl 1370