1# 2# A higher level module for using sockets (or Windows named pipes) 3# 4# multiprocessing/connection.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] 11 12import io 13import os 14import sys 15import socket 16import struct 17import time 18import tempfile 19import itertools 20 21import _multiprocessing 22 23from . import util 24 25from . import AuthenticationError, BufferTooShort 26from .context import reduction 27_ForkingPickler = reduction.ForkingPickler 28 29try: 30 import _winapi 31 from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE 32except ImportError: 33 if sys.platform == 'win32': 34 raise 35 _winapi = None 36 37# 38# 39# 40 41BUFSIZE = 8192 42# A very generous timeout when it comes to local connections... 43CONNECTION_TIMEOUT = 20. 44 45_mmap_counter = itertools.count() 46 47default_family = 'AF_INET' 48families = ['AF_INET'] 49 50if hasattr(socket, 'AF_UNIX'): 51 default_family = 'AF_UNIX' 52 families += ['AF_UNIX'] 53 54if sys.platform == 'win32': 55 default_family = 'AF_PIPE' 56 families += ['AF_PIPE'] 57 58 59def _init_timeout(timeout=CONNECTION_TIMEOUT): 60 return time.time() + timeout 61 62def _check_timeout(t): 63 return time.time() > t 64 65# 66# 67# 68 69def arbitrary_address(family): 70 ''' 71 Return an arbitrary free address for the given family 72 ''' 73 if family == 'AF_INET': 74 return ('localhost', 0) 75 elif family == 'AF_UNIX': 76 return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir()) 77 elif family == 'AF_PIPE': 78 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % 79 (os.getpid(), next(_mmap_counter)), dir="") 80 else: 81 raise ValueError('unrecognized family') 82 83def _validate_family(family): 84 ''' 85 Checks if the family is valid for the current environment. 86 ''' 87 if sys.platform != 'win32' and family == 'AF_PIPE': 88 raise ValueError('Family %s is not recognized.' % family) 89 90 if sys.platform == 'win32' and family == 'AF_UNIX': 91 # double check 92 if not hasattr(socket, family): 93 raise ValueError('Family %s is not recognized.' % family) 94 95def address_type(address): 96 ''' 97 Return the types of the address 98 99 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' 100 ''' 101 if type(address) == tuple: 102 return 'AF_INET' 103 elif type(address) is str and address.startswith('\\\\'): 104 return 'AF_PIPE' 105 elif type(address) is str: 106 return 'AF_UNIX' 107 else: 108 raise ValueError('address type of %r unrecognized' % address) 109 110# 111# Connection classes 112# 113 114class _ConnectionBase: 115 _handle = None 116 117 def __init__(self, handle, readable=True, writable=True): 118 handle = handle.__index__() 119 if handle < 0: 120 raise ValueError("invalid handle") 121 if not readable and not writable: 122 raise ValueError( 123 "at least one of `readable` and `writable` must be True") 124 self._handle = handle 125 self._readable = readable 126 self._writable = writable 127 128 # XXX should we use util.Finalize instead of a __del__? 129 130 def __del__(self): 131 if self._handle is not None: 132 self._close() 133 134 def _check_closed(self): 135 if self._handle is None: 136 raise OSError("handle is closed") 137 138 def _check_readable(self): 139 if not self._readable: 140 raise OSError("connection is write-only") 141 142 def _check_writable(self): 143 if not self._writable: 144 raise OSError("connection is read-only") 145 146 def _bad_message_length(self): 147 if self._writable: 148 self._readable = False 149 else: 150 self.close() 151 raise OSError("bad message length") 152 153 @property 154 def closed(self): 155 """True if the connection is closed""" 156 return self._handle is None 157 158 @property 159 def readable(self): 160 """True if the connection is readable""" 161 return self._readable 162 163 @property 164 def writable(self): 165 """True if the connection is writable""" 166 return self._writable 167 168 def fileno(self): 169 """File descriptor or handle of the connection""" 170 self._check_closed() 171 return self._handle 172 173 def close(self): 174 """Close the connection""" 175 if self._handle is not None: 176 try: 177 self._close() 178 finally: 179 self._handle = None 180 181 def send_bytes(self, buf, offset=0, size=None): 182 """Send the bytes data from a bytes-like object""" 183 self._check_closed() 184 self._check_writable() 185 m = memoryview(buf) 186 # HACK for byte-indexing of non-bytewise buffers (e.g. array.array) 187 if m.itemsize > 1: 188 m = memoryview(bytes(m)) 189 n = len(m) 190 if offset < 0: 191 raise ValueError("offset is negative") 192 if n < offset: 193 raise ValueError("buffer length < offset") 194 if size is None: 195 size = n - offset 196 elif size < 0: 197 raise ValueError("size is negative") 198 elif offset + size > n: 199 raise ValueError("buffer length < offset + size") 200 self._send_bytes(m[offset:offset + size]) 201 202 def send(self, obj): 203 """Send a (picklable) object""" 204 self._check_closed() 205 self._check_writable() 206 self._send_bytes(_ForkingPickler.dumps(obj)) 207 208 def recv_bytes(self, maxlength=None): 209 """ 210 Receive bytes data as a bytes object. 211 """ 212 self._check_closed() 213 self._check_readable() 214 if maxlength is not None and maxlength < 0: 215 raise ValueError("negative maxlength") 216 buf = self._recv_bytes(maxlength) 217 if buf is None: 218 self._bad_message_length() 219 return buf.getvalue() 220 221 def recv_bytes_into(self, buf, offset=0): 222 """ 223 Receive bytes data into a writeable bytes-like object. 224 Return the number of bytes read. 225 """ 226 self._check_closed() 227 self._check_readable() 228 with memoryview(buf) as m: 229 # Get bytesize of arbitrary buffer 230 itemsize = m.itemsize 231 bytesize = itemsize * len(m) 232 if offset < 0: 233 raise ValueError("negative offset") 234 elif offset > bytesize: 235 raise ValueError("offset too large") 236 result = self._recv_bytes() 237 size = result.tell() 238 if bytesize < offset + size: 239 raise BufferTooShort(result.getvalue()) 240 # Message can fit in dest 241 result.seek(0) 242 result.readinto(m[offset // itemsize : 243 (offset + size) // itemsize]) 244 return size 245 246 def recv(self): 247 """Receive a (picklable) object""" 248 self._check_closed() 249 self._check_readable() 250 buf = self._recv_bytes() 251 return _ForkingPickler.loads(buf.getbuffer()) 252 253 def poll(self, timeout=0.0): 254 """Whether there is any input available to be read""" 255 self._check_closed() 256 self._check_readable() 257 return self._poll(timeout) 258 259 def __enter__(self): 260 return self 261 262 def __exit__(self, exc_type, exc_value, exc_tb): 263 self.close() 264 265 266if _winapi: 267 268 class PipeConnection(_ConnectionBase): 269 """ 270 Connection class based on a Windows named pipe. 271 Overlapped I/O is used, so the handles must have been created 272 with FILE_FLAG_OVERLAPPED. 273 """ 274 _got_empty_message = False 275 276 def _close(self, _CloseHandle=_winapi.CloseHandle): 277 _CloseHandle(self._handle) 278 279 def _send_bytes(self, buf): 280 ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) 281 try: 282 if err == _winapi.ERROR_IO_PENDING: 283 waitres = _winapi.WaitForMultipleObjects( 284 [ov.event], False, INFINITE) 285 assert waitres == WAIT_OBJECT_0 286 except: 287 ov.cancel() 288 raise 289 finally: 290 nwritten, err = ov.GetOverlappedResult(True) 291 assert err == 0 292 assert nwritten == len(buf) 293 294 def _recv_bytes(self, maxsize=None): 295 if self._got_empty_message: 296 self._got_empty_message = False 297 return io.BytesIO() 298 else: 299 bsize = 128 if maxsize is None else min(maxsize, 128) 300 try: 301 ov, err = _winapi.ReadFile(self._handle, bsize, 302 overlapped=True) 303 try: 304 if err == _winapi.ERROR_IO_PENDING: 305 waitres = _winapi.WaitForMultipleObjects( 306 [ov.event], False, INFINITE) 307 assert waitres == WAIT_OBJECT_0 308 except: 309 ov.cancel() 310 raise 311 finally: 312 nread, err = ov.GetOverlappedResult(True) 313 if err == 0: 314 f = io.BytesIO() 315 f.write(ov.getbuffer()) 316 return f 317 elif err == _winapi.ERROR_MORE_DATA: 318 return self._get_more_data(ov, maxsize) 319 except OSError as e: 320 if e.winerror == _winapi.ERROR_BROKEN_PIPE: 321 raise EOFError 322 else: 323 raise 324 raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") 325 326 def _poll(self, timeout): 327 if (self._got_empty_message or 328 _winapi.PeekNamedPipe(self._handle)[0] != 0): 329 return True 330 return bool(wait([self], timeout)) 331 332 def _get_more_data(self, ov, maxsize): 333 buf = ov.getbuffer() 334 f = io.BytesIO() 335 f.write(buf) 336 left = _winapi.PeekNamedPipe(self._handle)[1] 337 assert left > 0 338 if maxsize is not None and len(buf) + left > maxsize: 339 self._bad_message_length() 340 ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) 341 rbytes, err = ov.GetOverlappedResult(True) 342 assert err == 0 343 assert rbytes == left 344 f.write(ov.getbuffer()) 345 return f 346 347 348class Connection(_ConnectionBase): 349 """ 350 Connection class based on an arbitrary file descriptor (Unix only), or 351 a socket handle (Windows). 352 """ 353 354 if _winapi: 355 def _close(self, _close=_multiprocessing.closesocket): 356 _close(self._handle) 357 _write = _multiprocessing.send 358 _read = _multiprocessing.recv 359 else: 360 def _close(self, _close=os.close): 361 _close(self._handle) 362 _write = os.write 363 _read = os.read 364 365 def _send(self, buf, write=_write): 366 remaining = len(buf) 367 while True: 368 n = write(self._handle, buf) 369 remaining -= n 370 if remaining == 0: 371 break 372 buf = buf[n:] 373 374 def _recv(self, size, read=_read): 375 buf = io.BytesIO() 376 handle = self._handle 377 remaining = size 378 while remaining > 0: 379 chunk = read(handle, remaining) 380 n = len(chunk) 381 if n == 0: 382 if remaining == size: 383 raise EOFError 384 else: 385 raise OSError("got end of file during message") 386 buf.write(chunk) 387 remaining -= n 388 return buf 389 390 def _send_bytes(self, buf): 391 n = len(buf) 392 # For wire compatibility with 3.2 and lower 393 header = struct.pack("!i", n) 394 if n > 16384: 395 # The payload is large so Nagle's algorithm won't be triggered 396 # and we'd better avoid the cost of concatenation. 397 self._send(header) 398 self._send(buf) 399 else: 400 # Issue #20540: concatenate before sending, to avoid delays due 401 # to Nagle's algorithm on a TCP socket. 402 # Also note we want to avoid sending a 0-length buffer separately, 403 # to avoid "broken pipe" errors if the other end closed the pipe. 404 self._send(header + buf) 405 406 def _recv_bytes(self, maxsize=None): 407 buf = self._recv(4) 408 size, = struct.unpack("!i", buf.getvalue()) 409 if maxsize is not None and size > maxsize: 410 return None 411 return self._recv(size) 412 413 def _poll(self, timeout): 414 r = wait([self], timeout) 415 return bool(r) 416 417 418# 419# Public functions 420# 421 422class Listener(object): 423 ''' 424 Returns a listener object. 425 426 This is a wrapper for a bound socket which is 'listening' for 427 connections, or for a Windows named pipe. 428 ''' 429 def __init__(self, address=None, family=None, backlog=1, authkey=None): 430 family = family or (address and address_type(address)) \ 431 or default_family 432 address = address or arbitrary_address(family) 433 434 _validate_family(family) 435 if family == 'AF_PIPE': 436 self._listener = PipeListener(address, backlog) 437 else: 438 self._listener = SocketListener(address, family, backlog) 439 440 if authkey is not None and not isinstance(authkey, bytes): 441 raise TypeError('authkey should be a byte string') 442 443 self._authkey = authkey 444 445 def accept(self): 446 ''' 447 Accept a connection on the bound socket or named pipe of `self`. 448 449 Returns a `Connection` object. 450 ''' 451 if self._listener is None: 452 raise OSError('listener is closed') 453 c = self._listener.accept() 454 if self._authkey: 455 deliver_challenge(c, self._authkey) 456 answer_challenge(c, self._authkey) 457 return c 458 459 def close(self): 460 ''' 461 Close the bound socket or named pipe of `self`. 462 ''' 463 listener = self._listener 464 if listener is not None: 465 self._listener = None 466 listener.close() 467 468 address = property(lambda self: self._listener._address) 469 last_accepted = property(lambda self: self._listener._last_accepted) 470 471 def __enter__(self): 472 return self 473 474 def __exit__(self, exc_type, exc_value, exc_tb): 475 self.close() 476 477 478def Client(address, family=None, authkey=None): 479 ''' 480 Returns a connection to the address of a `Listener` 481 ''' 482 family = family or address_type(address) 483 _validate_family(family) 484 if family == 'AF_PIPE': 485 c = PipeClient(address) 486 else: 487 c = SocketClient(address) 488 489 if authkey is not None and not isinstance(authkey, bytes): 490 raise TypeError('authkey should be a byte string') 491 492 if authkey is not None: 493 answer_challenge(c, authkey) 494 deliver_challenge(c, authkey) 495 496 return c 497 498 499if sys.platform != 'win32': 500 501 def Pipe(duplex=True): 502 ''' 503 Returns pair of connection objects at either end of a pipe 504 ''' 505 if duplex: 506 s1, s2 = socket.socketpair() 507 s1.setblocking(True) 508 s2.setblocking(True) 509 c1 = Connection(s1.detach()) 510 c2 = Connection(s2.detach()) 511 else: 512 fd1, fd2 = os.pipe() 513 c1 = Connection(fd1, writable=False) 514 c2 = Connection(fd2, readable=False) 515 516 return c1, c2 517 518else: 519 520 def Pipe(duplex=True): 521 ''' 522 Returns pair of connection objects at either end of a pipe 523 ''' 524 address = arbitrary_address('AF_PIPE') 525 if duplex: 526 openmode = _winapi.PIPE_ACCESS_DUPLEX 527 access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE 528 obsize, ibsize = BUFSIZE, BUFSIZE 529 else: 530 openmode = _winapi.PIPE_ACCESS_INBOUND 531 access = _winapi.GENERIC_WRITE 532 obsize, ibsize = 0, BUFSIZE 533 534 h1 = _winapi.CreateNamedPipe( 535 address, openmode | _winapi.FILE_FLAG_OVERLAPPED | 536 _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, 537 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 538 _winapi.PIPE_WAIT, 539 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, 540 # default security descriptor: the handle cannot be inherited 541 _winapi.NULL 542 ) 543 h2 = _winapi.CreateFile( 544 address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, 545 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 546 ) 547 _winapi.SetNamedPipeHandleState( 548 h2, _winapi.PIPE_READMODE_MESSAGE, None, None 549 ) 550 551 overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True) 552 _, err = overlapped.GetOverlappedResult(True) 553 assert err == 0 554 555 c1 = PipeConnection(h1, writable=duplex) 556 c2 = PipeConnection(h2, readable=duplex) 557 558 return c1, c2 559 560# 561# Definitions for connections based on sockets 562# 563 564class SocketListener(object): 565 ''' 566 Representation of a socket which is bound to an address and listening 567 ''' 568 def __init__(self, address, family, backlog=1): 569 self._socket = socket.socket(getattr(socket, family)) 570 try: 571 # SO_REUSEADDR has different semantics on Windows (issue #2550). 572 if os.name == 'posix': 573 self._socket.setsockopt(socket.SOL_SOCKET, 574 socket.SO_REUSEADDR, 1) 575 self._socket.setblocking(True) 576 self._socket.bind(address) 577 self._socket.listen(backlog) 578 self._address = self._socket.getsockname() 579 except OSError: 580 self._socket.close() 581 raise 582 self._family = family 583 self._last_accepted = None 584 585 if family == 'AF_UNIX': 586 self._unlink = util.Finalize( 587 self, os.unlink, args=(address,), exitpriority=0 588 ) 589 else: 590 self._unlink = None 591 592 def accept(self): 593 s, self._last_accepted = self._socket.accept() 594 s.setblocking(True) 595 return Connection(s.detach()) 596 597 def close(self): 598 try: 599 self._socket.close() 600 finally: 601 unlink = self._unlink 602 if unlink is not None: 603 self._unlink = None 604 unlink() 605 606 607def SocketClient(address): 608 ''' 609 Return a connection object connected to the socket given by `address` 610 ''' 611 family = address_type(address) 612 with socket.socket( getattr(socket, family) ) as s: 613 s.setblocking(True) 614 s.connect(address) 615 return Connection(s.detach()) 616 617# 618# Definitions for connections based on named pipes 619# 620 621if sys.platform == 'win32': 622 623 class PipeListener(object): 624 ''' 625 Representation of a named pipe 626 ''' 627 def __init__(self, address, backlog=None): 628 self._address = address 629 self._handle_queue = [self._new_handle(first=True)] 630 631 self._last_accepted = None 632 util.sub_debug('listener created with address=%r', self._address) 633 self.close = util.Finalize( 634 self, PipeListener._finalize_pipe_listener, 635 args=(self._handle_queue, self._address), exitpriority=0 636 ) 637 638 def _new_handle(self, first=False): 639 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 640 if first: 641 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 642 return _winapi.CreateNamedPipe( 643 self._address, flags, 644 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 645 _winapi.PIPE_WAIT, 646 _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, 647 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL 648 ) 649 650 def accept(self): 651 self._handle_queue.append(self._new_handle()) 652 handle = self._handle_queue.pop(0) 653 try: 654 ov = _winapi.ConnectNamedPipe(handle, overlapped=True) 655 except OSError as e: 656 if e.winerror != _winapi.ERROR_NO_DATA: 657 raise 658 # ERROR_NO_DATA can occur if a client has already connected, 659 # written data and then disconnected -- see Issue 14725. 660 else: 661 try: 662 res = _winapi.WaitForMultipleObjects( 663 [ov.event], False, INFINITE) 664 except: 665 ov.cancel() 666 _winapi.CloseHandle(handle) 667 raise 668 finally: 669 _, err = ov.GetOverlappedResult(True) 670 assert err == 0 671 return PipeConnection(handle) 672 673 @staticmethod 674 def _finalize_pipe_listener(queue, address): 675 util.sub_debug('closing listener with address=%r', address) 676 for handle in queue: 677 _winapi.CloseHandle(handle) 678 679 def PipeClient(address): 680 ''' 681 Return a connection object connected to the pipe given by `address` 682 ''' 683 t = _init_timeout() 684 while 1: 685 try: 686 _winapi.WaitNamedPipe(address, 1000) 687 h = _winapi.CreateFile( 688 address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, 689 0, _winapi.NULL, _winapi.OPEN_EXISTING, 690 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 691 ) 692 except OSError as e: 693 if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, 694 _winapi.ERROR_PIPE_BUSY) or _check_timeout(t): 695 raise 696 else: 697 break 698 else: 699 raise 700 701 _winapi.SetNamedPipeHandleState( 702 h, _winapi.PIPE_READMODE_MESSAGE, None, None 703 ) 704 return PipeConnection(h) 705 706# 707# Authentication stuff 708# 709 710MESSAGE_LENGTH = 20 711 712CHALLENGE = b'#CHALLENGE#' 713WELCOME = b'#WELCOME#' 714FAILURE = b'#FAILURE#' 715 716def deliver_challenge(connection, authkey): 717 import hmac 718 assert isinstance(authkey, bytes) 719 message = os.urandom(MESSAGE_LENGTH) 720 connection.send_bytes(CHALLENGE + message) 721 digest = hmac.new(authkey, message, 'md5').digest() 722 response = connection.recv_bytes(256) # reject large message 723 if response == digest: 724 connection.send_bytes(WELCOME) 725 else: 726 connection.send_bytes(FAILURE) 727 raise AuthenticationError('digest received was wrong') 728 729def answer_challenge(connection, authkey): 730 import hmac 731 assert isinstance(authkey, bytes) 732 message = connection.recv_bytes(256) # reject large message 733 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message 734 message = message[len(CHALLENGE):] 735 digest = hmac.new(authkey, message, 'md5').digest() 736 connection.send_bytes(digest) 737 response = connection.recv_bytes(256) # reject large message 738 if response != WELCOME: 739 raise AuthenticationError('digest sent was rejected') 740 741# 742# Support for using xmlrpclib for serialization 743# 744 745class ConnectionWrapper(object): 746 def __init__(self, conn, dumps, loads): 747 self._conn = conn 748 self._dumps = dumps 749 self._loads = loads 750 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): 751 obj = getattr(conn, attr) 752 setattr(self, attr, obj) 753 def send(self, obj): 754 s = self._dumps(obj) 755 self._conn.send_bytes(s) 756 def recv(self): 757 s = self._conn.recv_bytes() 758 return self._loads(s) 759 760def _xml_dumps(obj): 761 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') 762 763def _xml_loads(s): 764 (obj,), method = xmlrpclib.loads(s.decode('utf-8')) 765 return obj 766 767class XmlListener(Listener): 768 def accept(self): 769 global xmlrpclib 770 import xmlrpc.client as xmlrpclib 771 obj = Listener.accept(self) 772 return ConnectionWrapper(obj, _xml_dumps, _xml_loads) 773 774def XmlClient(*args, **kwds): 775 global xmlrpclib 776 import xmlrpc.client as xmlrpclib 777 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) 778 779# 780# Wait 781# 782 783if sys.platform == 'win32': 784 785 def _exhaustive_wait(handles, timeout): 786 # Return ALL handles which are currently signalled. (Only 787 # returning the first signalled might create starvation issues.) 788 L = list(handles) 789 ready = [] 790 while L: 791 res = _winapi.WaitForMultipleObjects(L, False, timeout) 792 if res == WAIT_TIMEOUT: 793 break 794 elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): 795 res -= WAIT_OBJECT_0 796 elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): 797 res -= WAIT_ABANDONED_0 798 else: 799 raise RuntimeError('Should not get here') 800 ready.append(L[res]) 801 L = L[res+1:] 802 timeout = 0 803 return ready 804 805 _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED} 806 807 def wait(object_list, timeout=None): 808 ''' 809 Wait till an object in object_list is ready/readable. 810 811 Returns list of those objects in object_list which are ready/readable. 812 ''' 813 if timeout is None: 814 timeout = INFINITE 815 elif timeout < 0: 816 timeout = 0 817 else: 818 timeout = int(timeout * 1000 + 0.5) 819 820 object_list = list(object_list) 821 waithandle_to_obj = {} 822 ov_list = [] 823 ready_objects = set() 824 ready_handles = set() 825 826 try: 827 for o in object_list: 828 try: 829 fileno = getattr(o, 'fileno') 830 except AttributeError: 831 waithandle_to_obj[o.__index__()] = o 832 else: 833 # start an overlapped read of length zero 834 try: 835 ov, err = _winapi.ReadFile(fileno(), 0, True) 836 except OSError as e: 837 ov, err = None, e.winerror 838 if err not in _ready_errors: 839 raise 840 if err == _winapi.ERROR_IO_PENDING: 841 ov_list.append(ov) 842 waithandle_to_obj[ov.event] = o 843 else: 844 # If o.fileno() is an overlapped pipe handle and 845 # err == 0 then there is a zero length message 846 # in the pipe, but it HAS NOT been consumed... 847 if ov and sys.getwindowsversion()[:2] >= (6, 2): 848 # ... except on Windows 8 and later, where 849 # the message HAS been consumed. 850 try: 851 _, err = ov.GetOverlappedResult(False) 852 except OSError as e: 853 err = e.winerror 854 if not err and hasattr(o, '_got_empty_message'): 855 o._got_empty_message = True 856 ready_objects.add(o) 857 timeout = 0 858 859 ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) 860 finally: 861 # request that overlapped reads stop 862 for ov in ov_list: 863 ov.cancel() 864 865 # wait for all overlapped reads to stop 866 for ov in ov_list: 867 try: 868 _, err = ov.GetOverlappedResult(True) 869 except OSError as e: 870 err = e.winerror 871 if err not in _ready_errors: 872 raise 873 if err != _winapi.ERROR_OPERATION_ABORTED: 874 o = waithandle_to_obj[ov.event] 875 ready_objects.add(o) 876 if err == 0: 877 # If o.fileno() is an overlapped pipe handle then 878 # a zero length message HAS been consumed. 879 if hasattr(o, '_got_empty_message'): 880 o._got_empty_message = True 881 882 ready_objects.update(waithandle_to_obj[h] for h in ready_handles) 883 return [o for o in object_list if o in ready_objects] 884 885else: 886 887 import selectors 888 889 # poll/select have the advantage of not requiring any extra file 890 # descriptor, contrarily to epoll/kqueue (also, they require a single 891 # syscall). 892 if hasattr(selectors, 'PollSelector'): 893 _WaitSelector = selectors.PollSelector 894 else: 895 _WaitSelector = selectors.SelectSelector 896 897 def wait(object_list, timeout=None): 898 ''' 899 Wait till an object in object_list is ready/readable. 900 901 Returns list of those objects in object_list which are ready/readable. 902 ''' 903 with _WaitSelector() as selector: 904 for obj in object_list: 905 selector.register(obj, selectors.EVENT_READ) 906 907 if timeout is not None: 908 deadline = time.time() + timeout 909 910 while True: 911 ready = selector.select(timeout) 912 if ready: 913 return [key.fileobj for (key, events) in ready] 914 else: 915 if timeout is not None: 916 timeout = deadline - time.time() 917 if timeout < 0: 918 return ready 919 920# 921# Make connection and socket objects sharable if possible 922# 923 924if sys.platform == 'win32': 925 def reduce_connection(conn): 926 handle = conn.fileno() 927 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: 928 from . import resource_sharer 929 ds = resource_sharer.DupSocket(s) 930 return rebuild_connection, (ds, conn.readable, conn.writable) 931 def rebuild_connection(ds, readable, writable): 932 sock = ds.detach() 933 return Connection(sock.detach(), readable, writable) 934 reduction.register(Connection, reduce_connection) 935 936 def reduce_pipe_connection(conn): 937 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | 938 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) 939 dh = reduction.DupHandle(conn.fileno(), access) 940 return rebuild_pipe_connection, (dh, conn.readable, conn.writable) 941 def rebuild_pipe_connection(dh, readable, writable): 942 handle = dh.detach() 943 return PipeConnection(handle, readable, writable) 944 reduction.register(PipeConnection, reduce_pipe_connection) 945 946else: 947 def reduce_connection(conn): 948 df = reduction.DupFd(conn.fileno()) 949 return rebuild_connection, (df, conn.readable, conn.writable) 950 def rebuild_connection(df, readable, writable): 951 fd = df.detach() 952 return Connection(fd, readable, writable) 953 reduction.register(Connection, reduce_connection) 954