1# 2# Analogue of `multiprocessing.connection` which uses queues instead of sockets 3# 4# multiprocessing/dummy/connection.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = [ 'Client', 'Listener', 'Pipe' ] 11 12from queue import Queue 13 14 15families = [None] 16 17 18class Listener(object): 19 20 def __init__(self, address=None, family=None, backlog=1): 21 self._backlog_queue = Queue(backlog) 22 23 def accept(self): 24 return Connection(*self._backlog_queue.get()) 25 26 def close(self): 27 self._backlog_queue = None 28 29 @property 30 def address(self): 31 return self._backlog_queue 32 33 def __enter__(self): 34 return self 35 36 def __exit__(self, exc_type, exc_value, exc_tb): 37 self.close() 38 39 40def Client(address): 41 _in, _out = Queue(), Queue() 42 address.put((_out, _in)) 43 return Connection(_in, _out) 44 45 46def Pipe(duplex=True): 47 a, b = Queue(), Queue() 48 return Connection(a, b), Connection(b, a) 49 50 51class Connection(object): 52 53 def __init__(self, _in, _out): 54 self._out = _out 55 self._in = _in 56 self.send = self.send_bytes = _out.put 57 self.recv = self.recv_bytes = _in.get 58 59 def poll(self, timeout=0.0): 60 if self._in.qsize() > 0: 61 return True 62 if timeout <= 0.0: 63 return False 64 with self._in.not_empty: 65 self._in.not_empty.wait(timeout) 66 return self._in.qsize() > 0 67 68 def close(self): 69 pass 70 71 def __enter__(self): 72 return self 73 74 def __exit__(self, exc_type, exc_value, exc_tb): 75 self.close() 76