1# 2# A higher level module for using sockets (or Windows named pipes) 3# 4# multiprocessing/connection.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] 11 12import io 13import os 14import sys 15import socket 16import struct 17import time 18import tempfile 19import itertools 20 21import _multiprocessing 22 23from . import util 24 25from . import AuthenticationError, BufferTooShort 26from .context import reduction 27_ForkingPickler = reduction.ForkingPickler 28 29try: 30 import _winapi 31 from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE 32except ImportError: 33 if sys.platform == 'win32': 34 raise 35 _winapi = None 36 37# 38# 39# 40 41BUFSIZE = 8192 42# A very generous timeout when it comes to local connections... 43CONNECTION_TIMEOUT = 20. 44 45_mmap_counter = itertools.count() 46 47default_family = 'AF_INET' 48families = ['AF_INET'] 49 50if hasattr(socket, 'AF_UNIX'): 51 default_family = 'AF_UNIX' 52 families += ['AF_UNIX'] 53 54if sys.platform == 'win32': 55 default_family = 'AF_PIPE' 56 families += ['AF_PIPE'] 57 58 59def _init_timeout(timeout=CONNECTION_TIMEOUT): 60 return time.monotonic() + timeout 61 62def _check_timeout(t): 63 return time.monotonic() > t 64 65# 66# 67# 68 69def arbitrary_address(family): 70 ''' 71 Return an arbitrary free address for the given family 72 ''' 73 if family == 'AF_INET': 74 return ('localhost', 0) 75 elif family == 'AF_UNIX': 76 return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir()) 77 elif family == 'AF_PIPE': 78 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % 79 (os.getpid(), next(_mmap_counter)), dir="") 80 else: 81 raise ValueError('unrecognized family') 82 83def _validate_family(family): 84 ''' 85 Checks if the family is valid for the current environment. 86 ''' 87 if sys.platform != 'win32' and family == 'AF_PIPE': 88 raise ValueError('Family %s is not recognized.' % family) 89 90 if sys.platform == 'win32' and family == 'AF_UNIX': 91 # double check 92 if not hasattr(socket, family): 93 raise ValueError('Family %s is not recognized.' % family) 94 95def address_type(address): 96 ''' 97 Return the types of the address 98 99 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' 100 ''' 101 if type(address) == tuple: 102 return 'AF_INET' 103 elif type(address) is str and address.startswith('\\\\'): 104 return 'AF_PIPE' 105 elif type(address) is str: 106 return 'AF_UNIX' 107 else: 108 raise ValueError('address type of %r unrecognized' % address) 109 110# 111# Connection classes 112# 113 114class _ConnectionBase: 115 _handle = None 116 117 def __init__(self, handle, readable=True, writable=True): 118 handle = handle.__index__() 119 if handle < 0: 120 raise ValueError("invalid handle") 121 if not readable and not writable: 122 raise ValueError( 123 "at least one of `readable` and `writable` must be True") 124 self._handle = handle 125 self._readable = readable 126 self._writable = writable 127 128 # XXX should we use util.Finalize instead of a __del__? 129 130 def __del__(self): 131 if self._handle is not None: 132 self._close() 133 134 def _check_closed(self): 135 if self._handle is None: 136 raise OSError("handle is closed") 137 138 def _check_readable(self): 139 if not self._readable: 140 raise OSError("connection is write-only") 141 142 def _check_writable(self): 143 if not self._writable: 144 raise OSError("connection is read-only") 145 146 def _bad_message_length(self): 147 if self._writable: 148 self._readable = False 149 else: 150 self.close() 151 raise OSError("bad message length") 152 153 @property 154 def closed(self): 155 """True if the connection is closed""" 156 return self._handle is None 157 158 @property 159 def readable(self): 160 """True if the connection is readable""" 161 return self._readable 162 163 @property 164 def writable(self): 165 """True if the connection is writable""" 166 return self._writable 167 168 def fileno(self): 169 """File descriptor or handle of the connection""" 170 self._check_closed() 171 return self._handle 172 173 def close(self): 174 """Close the connection""" 175 if self._handle is not None: 176 try: 177 self._close() 178 finally: 179 self._handle = None 180 181 def send_bytes(self, buf, offset=0, size=None): 182 """Send the bytes data from a bytes-like object""" 183 self._check_closed() 184 self._check_writable() 185 m = memoryview(buf) 186 # HACK for byte-indexing of non-bytewise buffers (e.g. array.array) 187 if m.itemsize > 1: 188 m = memoryview(bytes(m)) 189 n = len(m) 190 if offset < 0: 191 raise ValueError("offset is negative") 192 if n < offset: 193 raise ValueError("buffer length < offset") 194 if size is None: 195 size = n - offset 196 elif size < 0: 197 raise ValueError("size is negative") 198 elif offset + size > n: 199 raise ValueError("buffer length < offset + size") 200 self._send_bytes(m[offset:offset + size]) 201 202 def send(self, obj): 203 """Send a (picklable) object""" 204 self._check_closed() 205 self._check_writable() 206 self._send_bytes(_ForkingPickler.dumps(obj)) 207 208 def recv_bytes(self, maxlength=None): 209 """ 210 Receive bytes data as a bytes object. 211 """ 212 self._check_closed() 213 self._check_readable() 214 if maxlength is not None and maxlength < 0: 215 raise ValueError("negative maxlength") 216 buf = self._recv_bytes(maxlength) 217 if buf is None: 218 self._bad_message_length() 219 return buf.getvalue() 220 221 def recv_bytes_into(self, buf, offset=0): 222 """ 223 Receive bytes data into a writeable bytes-like object. 224 Return the number of bytes read. 225 """ 226 self._check_closed() 227 self._check_readable() 228 with memoryview(buf) as m: 229 # Get bytesize of arbitrary buffer 230 itemsize = m.itemsize 231 bytesize = itemsize * len(m) 232 if offset < 0: 233 raise ValueError("negative offset") 234 elif offset > bytesize: 235 raise ValueError("offset too large") 236 result = self._recv_bytes() 237 size = result.tell() 238 if bytesize < offset + size: 239 raise BufferTooShort(result.getvalue()) 240 # Message can fit in dest 241 result.seek(0) 242 result.readinto(m[offset // itemsize : 243 (offset + size) // itemsize]) 244 return size 245 246 def recv(self): 247 """Receive a (picklable) object""" 248 self._check_closed() 249 self._check_readable() 250 buf = self._recv_bytes() 251 return _ForkingPickler.loads(buf.getbuffer()) 252 253 def poll(self, timeout=0.0): 254 """Whether there is any input available to be read""" 255 self._check_closed() 256 self._check_readable() 257 return self._poll(timeout) 258 259 def __enter__(self): 260 return self 261 262 def __exit__(self, exc_type, exc_value, exc_tb): 263 self.close() 264 265 266if _winapi: 267 268 class PipeConnection(_ConnectionBase): 269 """ 270 Connection class based on a Windows named pipe. 271 Overlapped I/O is used, so the handles must have been created 272 with FILE_FLAG_OVERLAPPED. 273 """ 274 _got_empty_message = False 275 276 def _close(self, _CloseHandle=_winapi.CloseHandle): 277 _CloseHandle(self._handle) 278 279 def _send_bytes(self, buf): 280 ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) 281 try: 282 if err == _winapi.ERROR_IO_PENDING: 283 waitres = _winapi.WaitForMultipleObjects( 284 [ov.event], False, INFINITE) 285 assert waitres == WAIT_OBJECT_0 286 except: 287 ov.cancel() 288 raise 289 finally: 290 nwritten, err = ov.GetOverlappedResult(True) 291 assert err == 0 292 assert nwritten == len(buf) 293 294 def _recv_bytes(self, maxsize=None): 295 if self._got_empty_message: 296 self._got_empty_message = False 297 return io.BytesIO() 298 else: 299 bsize = 128 if maxsize is None else min(maxsize, 128) 300 try: 301 ov, err = _winapi.ReadFile(self._handle, bsize, 302 overlapped=True) 303 try: 304 if err == _winapi.ERROR_IO_PENDING: 305 waitres = _winapi.WaitForMultipleObjects( 306 [ov.event], False, INFINITE) 307 assert waitres == WAIT_OBJECT_0 308 except: 309 ov.cancel() 310 raise 311 finally: 312 nread, err = ov.GetOverlappedResult(True) 313 if err == 0: 314 f = io.BytesIO() 315 f.write(ov.getbuffer()) 316 return f 317 elif err == _winapi.ERROR_MORE_DATA: 318 return self._get_more_data(ov, maxsize) 319 except OSError as e: 320 if e.winerror == _winapi.ERROR_BROKEN_PIPE: 321 raise EOFError 322 else: 323 raise 324 raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") 325 326 def _poll(self, timeout): 327 if (self._got_empty_message or 328 _winapi.PeekNamedPipe(self._handle)[0] != 0): 329 return True 330 return bool(wait([self], timeout)) 331 332 def _get_more_data(self, ov, maxsize): 333 buf = ov.getbuffer() 334 f = io.BytesIO() 335 f.write(buf) 336 left = _winapi.PeekNamedPipe(self._handle)[1] 337 assert left > 0 338 if maxsize is not None and len(buf) + left > maxsize: 339 self._bad_message_length() 340 ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) 341 rbytes, err = ov.GetOverlappedResult(True) 342 assert err == 0 343 assert rbytes == left 344 f.write(ov.getbuffer()) 345 return f 346 347 348class Connection(_ConnectionBase): 349 """ 350 Connection class based on an arbitrary file descriptor (Unix only), or 351 a socket handle (Windows). 352 """ 353 354 if _winapi: 355 def _close(self, _close=_multiprocessing.closesocket): 356 _close(self._handle) 357 _write = _multiprocessing.send 358 _read = _multiprocessing.recv 359 else: 360 def _close(self, _close=os.close): 361 _close(self._handle) 362 _write = os.write 363 _read = os.read 364 365 def _send(self, buf, write=_write): 366 remaining = len(buf) 367 while True: 368 n = write(self._handle, buf) 369 remaining -= n 370 if remaining == 0: 371 break 372 buf = buf[n:] 373 374 def _recv(self, size, read=_read): 375 buf = io.BytesIO() 376 handle = self._handle 377 remaining = size 378 while remaining > 0: 379 chunk = read(handle, remaining) 380 n = len(chunk) 381 if n == 0: 382 if remaining == size: 383 raise EOFError 384 else: 385 raise OSError("got end of file during message") 386 buf.write(chunk) 387 remaining -= n 388 return buf 389 390 def _send_bytes(self, buf): 391 n = len(buf) 392 # For wire compatibility with 3.2 and lower 393 header = struct.pack("!i", n) 394 if n > 16384: 395 # The payload is large so Nagle's algorithm won't be triggered 396 # and we'd better avoid the cost of concatenation. 397 self._send(header) 398 self._send(buf) 399 else: 400 # Issue #20540: concatenate before sending, to avoid delays due 401 # to Nagle's algorithm on a TCP socket. 402 # Also note we want to avoid sending a 0-length buffer separately, 403 # to avoid "broken pipe" errors if the other end closed the pipe. 404 self._send(header + buf) 405 406 def _recv_bytes(self, maxsize=None): 407 buf = self._recv(4) 408 size, = struct.unpack("!i", buf.getvalue()) 409 if maxsize is not None and size > maxsize: 410 return None 411 return self._recv(size) 412 413 def _poll(self, timeout): 414 r = wait([self], timeout) 415 return bool(r) 416 417 418# 419# Public functions 420# 421 422class Listener(object): 423 ''' 424 Returns a listener object. 425 426 This is a wrapper for a bound socket which is 'listening' for 427 connections, or for a Windows named pipe. 428 ''' 429 def __init__(self, address=None, family=None, backlog=1, authkey=None): 430 family = family or (address and address_type(address)) \ 431 or default_family 432 address = address or arbitrary_address(family) 433 434 _validate_family(family) 435 if family == 'AF_PIPE': 436 self._listener = PipeListener(address, backlog) 437 else: 438 self._listener = SocketListener(address, family, backlog) 439 440 if authkey is not None and not isinstance(authkey, bytes): 441 raise TypeError('authkey should be a byte string') 442 443 self._authkey = authkey 444 445 def accept(self): 446 ''' 447 Accept a connection on the bound socket or named pipe of `self`. 448 449 Returns a `Connection` object. 450 ''' 451 if self._listener is None: 452 raise OSError('listener is closed') 453 c = self._listener.accept() 454 if self._authkey: 455 deliver_challenge(c, self._authkey) 456 answer_challenge(c, self._authkey) 457 return c 458 459 def close(self): 460 ''' 461 Close the bound socket or named pipe of `self`. 462 ''' 463 listener = self._listener 464 if listener is not None: 465 self._listener = None 466 listener.close() 467 468 @property 469 def address(self): 470 return self._listener._address 471 472 @property 473 def last_accepted(self): 474 return self._listener._last_accepted 475 476 def __enter__(self): 477 return self 478 479 def __exit__(self, exc_type, exc_value, exc_tb): 480 self.close() 481 482 483def Client(address, family=None, authkey=None): 484 ''' 485 Returns a connection to the address of a `Listener` 486 ''' 487 family = family or address_type(address) 488 _validate_family(family) 489 if family == 'AF_PIPE': 490 c = PipeClient(address) 491 else: 492 c = SocketClient(address) 493 494 if authkey is not None and not isinstance(authkey, bytes): 495 raise TypeError('authkey should be a byte string') 496 497 if authkey is not None: 498 answer_challenge(c, authkey) 499 deliver_challenge(c, authkey) 500 501 return c 502 503 504if sys.platform != 'win32': 505 506 def Pipe(duplex=True): 507 ''' 508 Returns pair of connection objects at either end of a pipe 509 ''' 510 if duplex: 511 s1, s2 = socket.socketpair() 512 s1.setblocking(True) 513 s2.setblocking(True) 514 c1 = Connection(s1.detach()) 515 c2 = Connection(s2.detach()) 516 else: 517 fd1, fd2 = os.pipe() 518 c1 = Connection(fd1, writable=False) 519 c2 = Connection(fd2, readable=False) 520 521 return c1, c2 522 523else: 524 525 def Pipe(duplex=True): 526 ''' 527 Returns pair of connection objects at either end of a pipe 528 ''' 529 address = arbitrary_address('AF_PIPE') 530 if duplex: 531 openmode = _winapi.PIPE_ACCESS_DUPLEX 532 access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE 533 obsize, ibsize = BUFSIZE, BUFSIZE 534 else: 535 openmode = _winapi.PIPE_ACCESS_INBOUND 536 access = _winapi.GENERIC_WRITE 537 obsize, ibsize = 0, BUFSIZE 538 539 h1 = _winapi.CreateNamedPipe( 540 address, openmode | _winapi.FILE_FLAG_OVERLAPPED | 541 _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, 542 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 543 _winapi.PIPE_WAIT, 544 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, 545 # default security descriptor: the handle cannot be inherited 546 _winapi.NULL 547 ) 548 h2 = _winapi.CreateFile( 549 address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, 550 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 551 ) 552 _winapi.SetNamedPipeHandleState( 553 h2, _winapi.PIPE_READMODE_MESSAGE, None, None 554 ) 555 556 overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True) 557 _, err = overlapped.GetOverlappedResult(True) 558 assert err == 0 559 560 c1 = PipeConnection(h1, writable=duplex) 561 c2 = PipeConnection(h2, readable=duplex) 562 563 return c1, c2 564 565# 566# Definitions for connections based on sockets 567# 568 569class SocketListener(object): 570 ''' 571 Representation of a socket which is bound to an address and listening 572 ''' 573 def __init__(self, address, family, backlog=1): 574 self._socket = socket.socket(getattr(socket, family)) 575 try: 576 # SO_REUSEADDR has different semantics on Windows (issue #2550). 577 if os.name == 'posix': 578 self._socket.setsockopt(socket.SOL_SOCKET, 579 socket.SO_REUSEADDR, 1) 580 self._socket.setblocking(True) 581 self._socket.bind(address) 582 self._socket.listen(backlog) 583 self._address = self._socket.getsockname() 584 except OSError: 585 self._socket.close() 586 raise 587 self._family = family 588 self._last_accepted = None 589 590 if family == 'AF_UNIX': 591 self._unlink = util.Finalize( 592 self, os.unlink, args=(address,), exitpriority=0 593 ) 594 else: 595 self._unlink = None 596 597 def accept(self): 598 s, self._last_accepted = self._socket.accept() 599 s.setblocking(True) 600 return Connection(s.detach()) 601 602 def close(self): 603 try: 604 self._socket.close() 605 finally: 606 unlink = self._unlink 607 if unlink is not None: 608 self._unlink = None 609 unlink() 610 611 612def SocketClient(address): 613 ''' 614 Return a connection object connected to the socket given by `address` 615 ''' 616 family = address_type(address) 617 with socket.socket( getattr(socket, family) ) as s: 618 s.setblocking(True) 619 s.connect(address) 620 return Connection(s.detach()) 621 622# 623# Definitions for connections based on named pipes 624# 625 626if sys.platform == 'win32': 627 628 class PipeListener(object): 629 ''' 630 Representation of a named pipe 631 ''' 632 def __init__(self, address, backlog=None): 633 self._address = address 634 self._handle_queue = [self._new_handle(first=True)] 635 636 self._last_accepted = None 637 util.sub_debug('listener created with address=%r', self._address) 638 self.close = util.Finalize( 639 self, PipeListener._finalize_pipe_listener, 640 args=(self._handle_queue, self._address), exitpriority=0 641 ) 642 643 def _new_handle(self, first=False): 644 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 645 if first: 646 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 647 return _winapi.CreateNamedPipe( 648 self._address, flags, 649 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 650 _winapi.PIPE_WAIT, 651 _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, 652 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL 653 ) 654 655 def accept(self): 656 self._handle_queue.append(self._new_handle()) 657 handle = self._handle_queue.pop(0) 658 try: 659 ov = _winapi.ConnectNamedPipe(handle, overlapped=True) 660 except OSError as e: 661 if e.winerror != _winapi.ERROR_NO_DATA: 662 raise 663 # ERROR_NO_DATA can occur if a client has already connected, 664 # written data and then disconnected -- see Issue 14725. 665 else: 666 try: 667 res = _winapi.WaitForMultipleObjects( 668 [ov.event], False, INFINITE) 669 except: 670 ov.cancel() 671 _winapi.CloseHandle(handle) 672 raise 673 finally: 674 _, err = ov.GetOverlappedResult(True) 675 assert err == 0 676 return PipeConnection(handle) 677 678 @staticmethod 679 def _finalize_pipe_listener(queue, address): 680 util.sub_debug('closing listener with address=%r', address) 681 for handle in queue: 682 _winapi.CloseHandle(handle) 683 684 def PipeClient(address): 685 ''' 686 Return a connection object connected to the pipe given by `address` 687 ''' 688 t = _init_timeout() 689 while 1: 690 try: 691 _winapi.WaitNamedPipe(address, 1000) 692 h = _winapi.CreateFile( 693 address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, 694 0, _winapi.NULL, _winapi.OPEN_EXISTING, 695 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 696 ) 697 except OSError as e: 698 if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, 699 _winapi.ERROR_PIPE_BUSY) or _check_timeout(t): 700 raise 701 else: 702 break 703 else: 704 raise 705 706 _winapi.SetNamedPipeHandleState( 707 h, _winapi.PIPE_READMODE_MESSAGE, None, None 708 ) 709 return PipeConnection(h) 710 711# 712# Authentication stuff 713# 714 715MESSAGE_LENGTH = 20 716 717CHALLENGE = b'#CHALLENGE#' 718WELCOME = b'#WELCOME#' 719FAILURE = b'#FAILURE#' 720 721def deliver_challenge(connection, authkey): 722 import hmac 723 if not isinstance(authkey, bytes): 724 raise ValueError( 725 "Authkey must be bytes, not {0!s}".format(type(authkey))) 726 message = os.urandom(MESSAGE_LENGTH) 727 connection.send_bytes(CHALLENGE + message) 728 digest = hmac.new(authkey, message, 'md5').digest() 729 response = connection.recv_bytes(256) # reject large message 730 if response == digest: 731 connection.send_bytes(WELCOME) 732 else: 733 connection.send_bytes(FAILURE) 734 raise AuthenticationError('digest received was wrong') 735 736def answer_challenge(connection, authkey): 737 import hmac 738 if not isinstance(authkey, bytes): 739 raise ValueError( 740 "Authkey must be bytes, not {0!s}".format(type(authkey))) 741 message = connection.recv_bytes(256) # reject large message 742 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message 743 message = message[len(CHALLENGE):] 744 digest = hmac.new(authkey, message, 'md5').digest() 745 connection.send_bytes(digest) 746 response = connection.recv_bytes(256) # reject large message 747 if response != WELCOME: 748 raise AuthenticationError('digest sent was rejected') 749 750# 751# Support for using xmlrpclib for serialization 752# 753 754class ConnectionWrapper(object): 755 def __init__(self, conn, dumps, loads): 756 self._conn = conn 757 self._dumps = dumps 758 self._loads = loads 759 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): 760 obj = getattr(conn, attr) 761 setattr(self, attr, obj) 762 def send(self, obj): 763 s = self._dumps(obj) 764 self._conn.send_bytes(s) 765 def recv(self): 766 s = self._conn.recv_bytes() 767 return self._loads(s) 768 769def _xml_dumps(obj): 770 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') 771 772def _xml_loads(s): 773 (obj,), method = xmlrpclib.loads(s.decode('utf-8')) 774 return obj 775 776class XmlListener(Listener): 777 def accept(self): 778 global xmlrpclib 779 import xmlrpc.client as xmlrpclib 780 obj = Listener.accept(self) 781 return ConnectionWrapper(obj, _xml_dumps, _xml_loads) 782 783def XmlClient(*args, **kwds): 784 global xmlrpclib 785 import xmlrpc.client as xmlrpclib 786 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) 787 788# 789# Wait 790# 791 792if sys.platform == 'win32': 793 794 def _exhaustive_wait(handles, timeout): 795 # Return ALL handles which are currently signalled. (Only 796 # returning the first signalled might create starvation issues.) 797 L = list(handles) 798 ready = [] 799 while L: 800 res = _winapi.WaitForMultipleObjects(L, False, timeout) 801 if res == WAIT_TIMEOUT: 802 break 803 elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): 804 res -= WAIT_OBJECT_0 805 elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): 806 res -= WAIT_ABANDONED_0 807 else: 808 raise RuntimeError('Should not get here') 809 ready.append(L[res]) 810 L = L[res+1:] 811 timeout = 0 812 return ready 813 814 _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED} 815 816 def wait(object_list, timeout=None): 817 ''' 818 Wait till an object in object_list is ready/readable. 819 820 Returns list of those objects in object_list which are ready/readable. 821 ''' 822 if timeout is None: 823 timeout = INFINITE 824 elif timeout < 0: 825 timeout = 0 826 else: 827 timeout = int(timeout * 1000 + 0.5) 828 829 object_list = list(object_list) 830 waithandle_to_obj = {} 831 ov_list = [] 832 ready_objects = set() 833 ready_handles = set() 834 835 try: 836 for o in object_list: 837 try: 838 fileno = getattr(o, 'fileno') 839 except AttributeError: 840 waithandle_to_obj[o.__index__()] = o 841 else: 842 # start an overlapped read of length zero 843 try: 844 ov, err = _winapi.ReadFile(fileno(), 0, True) 845 except OSError as e: 846 ov, err = None, e.winerror 847 if err not in _ready_errors: 848 raise 849 if err == _winapi.ERROR_IO_PENDING: 850 ov_list.append(ov) 851 waithandle_to_obj[ov.event] = o 852 else: 853 # If o.fileno() is an overlapped pipe handle and 854 # err == 0 then there is a zero length message 855 # in the pipe, but it HAS NOT been consumed... 856 if ov and sys.getwindowsversion()[:2] >= (6, 2): 857 # ... except on Windows 8 and later, where 858 # the message HAS been consumed. 859 try: 860 _, err = ov.GetOverlappedResult(False) 861 except OSError as e: 862 err = e.winerror 863 if not err and hasattr(o, '_got_empty_message'): 864 o._got_empty_message = True 865 ready_objects.add(o) 866 timeout = 0 867 868 ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) 869 finally: 870 # request that overlapped reads stop 871 for ov in ov_list: 872 ov.cancel() 873 874 # wait for all overlapped reads to stop 875 for ov in ov_list: 876 try: 877 _, err = ov.GetOverlappedResult(True) 878 except OSError as e: 879 err = e.winerror 880 if err not in _ready_errors: 881 raise 882 if err != _winapi.ERROR_OPERATION_ABORTED: 883 o = waithandle_to_obj[ov.event] 884 ready_objects.add(o) 885 if err == 0: 886 # If o.fileno() is an overlapped pipe handle then 887 # a zero length message HAS been consumed. 888 if hasattr(o, '_got_empty_message'): 889 o._got_empty_message = True 890 891 ready_objects.update(waithandle_to_obj[h] for h in ready_handles) 892 return [o for o in object_list if o in ready_objects] 893 894else: 895 896 import selectors 897 898 # poll/select have the advantage of not requiring any extra file 899 # descriptor, contrarily to epoll/kqueue (also, they require a single 900 # syscall). 901 if hasattr(selectors, 'PollSelector'): 902 _WaitSelector = selectors.PollSelector 903 else: 904 _WaitSelector = selectors.SelectSelector 905 906 def wait(object_list, timeout=None): 907 ''' 908 Wait till an object in object_list is ready/readable. 909 910 Returns list of those objects in object_list which are ready/readable. 911 ''' 912 with _WaitSelector() as selector: 913 for obj in object_list: 914 selector.register(obj, selectors.EVENT_READ) 915 916 if timeout is not None: 917 deadline = time.monotonic() + timeout 918 919 while True: 920 ready = selector.select(timeout) 921 if ready: 922 return [key.fileobj for (key, events) in ready] 923 else: 924 if timeout is not None: 925 timeout = deadline - time.monotonic() 926 if timeout < 0: 927 return ready 928 929# 930# Make connection and socket objects sharable if possible 931# 932 933if sys.platform == 'win32': 934 def reduce_connection(conn): 935 handle = conn.fileno() 936 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: 937 from . import resource_sharer 938 ds = resource_sharer.DupSocket(s) 939 return rebuild_connection, (ds, conn.readable, conn.writable) 940 def rebuild_connection(ds, readable, writable): 941 sock = ds.detach() 942 return Connection(sock.detach(), readable, writable) 943 reduction.register(Connection, reduce_connection) 944 945 def reduce_pipe_connection(conn): 946 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | 947 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) 948 dh = reduction.DupHandle(conn.fileno(), access) 949 return rebuild_pipe_connection, (dh, conn.readable, conn.writable) 950 def rebuild_pipe_connection(dh, readable, writable): 951 handle = dh.detach() 952 return PipeConnection(handle, readable, writable) 953 reduction.register(PipeConnection, reduce_pipe_connection) 954 955else: 956 def reduce_connection(conn): 957 df = reduction.DupFd(conn.fileno()) 958 return rebuild_connection, (df, conn.readable, conn.writable) 959 def rebuild_connection(df, readable, writable): 960 fd = df.detach() 961 return Connection(fd, readable, writable) 962 reduction.register(Connection, reduce_connection) 963