1"""Abstract Transport class.""" 2 3from asyncio import compat 4 5__all__ = ['BaseTransport', 'ReadTransport', 'WriteTransport', 6 'Transport', 'DatagramTransport', 'SubprocessTransport', 7 ] 8 9 10class BaseTransport: 11 """Base class for transports.""" 12 13 def __init__(self, extra=None): 14 if extra is None: 15 extra = {} 16 self._extra = extra 17 18 def get_extra_info(self, name, default=None): 19 """Get optional transport information.""" 20 return self._extra.get(name, default) 21 22 def is_closing(self): 23 """Return True if the transport is closing or closed.""" 24 raise NotImplementedError 25 26 def close(self): 27 """Close the transport. 28 29 Buffered data will be flushed asynchronously. No more data 30 will be received. After all buffered data is flushed, the 31 protocol's connection_lost() method will (eventually) called 32 with None as its argument. 33 """ 34 raise NotImplementedError 35 36 def set_protocol(self, protocol): 37 """Set a new protocol.""" 38 raise NotImplementedError 39 40 def get_protocol(self): 41 """Return the current protocol.""" 42 raise NotImplementedError 43 44 45class ReadTransport(BaseTransport): 46 """Interface for read-only transports.""" 47 48 def pause_reading(self): 49 """Pause the receiving end. 50 51 No data will be passed to the protocol's data_received() 52 method until resume_reading() is called. 53 """ 54 raise NotImplementedError 55 56 def resume_reading(self): 57 """Resume the receiving end. 58 59 Data received will once again be passed to the protocol's 60 data_received() method. 61 """ 62 raise NotImplementedError 63 64 65class WriteTransport(BaseTransport): 66 """Interface for write-only transports.""" 67 68 def set_write_buffer_limits(self, high=None, low=None): 69 """Set the high- and low-water limits for write flow control. 70 71 These two values control when to call the protocol's 72 pause_writing() and resume_writing() methods. If specified, 73 the low-water limit must be less than or equal to the 74 high-water limit. Neither value can be negative. 75 76 The defaults are implementation-specific. If only the 77 high-water limit is given, the low-water limit defaults to an 78 implementation-specific value less than or equal to the 79 high-water limit. Setting high to zero forces low to zero as 80 well, and causes pause_writing() to be called whenever the 81 buffer becomes non-empty. Setting low to zero causes 82 resume_writing() to be called only once the buffer is empty. 83 Use of zero for either limit is generally sub-optimal as it 84 reduces opportunities for doing I/O and computation 85 concurrently. 86 """ 87 raise NotImplementedError 88 89 def get_write_buffer_size(self): 90 """Return the current size of the write buffer.""" 91 raise NotImplementedError 92 93 def write(self, data): 94 """Write some data bytes to the transport. 95 96 This does not block; it buffers the data and arranges for it 97 to be sent out asynchronously. 98 """ 99 raise NotImplementedError 100 101 def writelines(self, list_of_data): 102 """Write a list (or any iterable) of data bytes to the transport. 103 104 The default implementation concatenates the arguments and 105 calls write() on the result. 106 """ 107 data = compat.flatten_list_bytes(list_of_data) 108 self.write(data) 109 110 def write_eof(self): 111 """Close the write end after flushing buffered data. 112 113 (This is like typing ^D into a UNIX program reading from stdin.) 114 115 Data may still be received. 116 """ 117 raise NotImplementedError 118 119 def can_write_eof(self): 120 """Return True if this transport supports write_eof(), False if not.""" 121 raise NotImplementedError 122 123 def abort(self): 124 """Close the transport immediately. 125 126 Buffered data will be lost. No more data will be received. 127 The protocol's connection_lost() method will (eventually) be 128 called with None as its argument. 129 """ 130 raise NotImplementedError 131 132 133class Transport(ReadTransport, WriteTransport): 134 """Interface representing a bidirectional transport. 135 136 There may be several implementations, but typically, the user does 137 not implement new transports; rather, the platform provides some 138 useful transports that are implemented using the platform's best 139 practices. 140 141 The user never instantiates a transport directly; they call a 142 utility function, passing it a protocol factory and other 143 information necessary to create the transport and protocol. (E.g. 144 EventLoop.create_connection() or EventLoop.create_server().) 145 146 The utility function will asynchronously create a transport and a 147 protocol and hook them up by calling the protocol's 148 connection_made() method, passing it the transport. 149 150 The implementation here raises NotImplemented for every method 151 except writelines(), which calls write() in a loop. 152 """ 153 154 155class DatagramTransport(BaseTransport): 156 """Interface for datagram (UDP) transports.""" 157 158 def sendto(self, data, addr=None): 159 """Send data to the transport. 160 161 This does not block; it buffers the data and arranges for it 162 to be sent out asynchronously. 163 addr is target socket address. 164 If addr is None use target address pointed on transport creation. 165 """ 166 raise NotImplementedError 167 168 def abort(self): 169 """Close the transport immediately. 170 171 Buffered data will be lost. No more data will be received. 172 The protocol's connection_lost() method will (eventually) be 173 called with None as its argument. 174 """ 175 raise NotImplementedError 176 177 178class SubprocessTransport(BaseTransport): 179 180 def get_pid(self): 181 """Get subprocess id.""" 182 raise NotImplementedError 183 184 def get_returncode(self): 185 """Get subprocess returncode. 186 187 See also 188 http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode 189 """ 190 raise NotImplementedError 191 192 def get_pipe_transport(self, fd): 193 """Get transport for pipe with number fd.""" 194 raise NotImplementedError 195 196 def send_signal(self, signal): 197 """Send signal to subprocess. 198 199 See also: 200 docs.python.org/3/library/subprocess#subprocess.Popen.send_signal 201 """ 202 raise NotImplementedError 203 204 def terminate(self): 205 """Stop the subprocess. 206 207 Alias for close() method. 208 209 On Posix OSs the method sends SIGTERM to the subprocess. 210 On Windows the Win32 API function TerminateProcess() 211 is called to stop the subprocess. 212 213 See also: 214 http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate 215 """ 216 raise NotImplementedError 217 218 def kill(self): 219 """Kill the subprocess. 220 221 On Posix OSs the function sends SIGKILL to the subprocess. 222 On Windows kill() is an alias for terminate(). 223 224 See also: 225 http://docs.python.org/3/library/subprocess#subprocess.Popen.kill 226 """ 227 raise NotImplementedError 228 229 230class _FlowControlMixin(Transport): 231 """All the logic for (write) flow control in a mix-in base class. 232 233 The subclass must implement get_write_buffer_size(). It must call 234 _maybe_pause_protocol() whenever the write buffer size increases, 235 and _maybe_resume_protocol() whenever it decreases. It may also 236 override set_write_buffer_limits() (e.g. to specify different 237 defaults). 238 239 The subclass constructor must call super().__init__(extra). This 240 will call set_write_buffer_limits(). 241 242 The user may call set_write_buffer_limits() and 243 get_write_buffer_size(), and their protocol's pause_writing() and 244 resume_writing() may be called. 245 """ 246 247 def __init__(self, extra=None, loop=None): 248 super().__init__(extra) 249 assert loop is not None 250 self._loop = loop 251 self._protocol_paused = False 252 self._set_write_buffer_limits() 253 254 def _maybe_pause_protocol(self): 255 size = self.get_write_buffer_size() 256 if size <= self._high_water: 257 return 258 if not self._protocol_paused: 259 self._protocol_paused = True 260 try: 261 self._protocol.pause_writing() 262 except Exception as exc: 263 self._loop.call_exception_handler({ 264 'message': 'protocol.pause_writing() failed', 265 'exception': exc, 266 'transport': self, 267 'protocol': self._protocol, 268 }) 269 270 def _maybe_resume_protocol(self): 271 if (self._protocol_paused and 272 self.get_write_buffer_size() <= self._low_water): 273 self._protocol_paused = False 274 try: 275 self._protocol.resume_writing() 276 except Exception as exc: 277 self._loop.call_exception_handler({ 278 'message': 'protocol.resume_writing() failed', 279 'exception': exc, 280 'transport': self, 281 'protocol': self._protocol, 282 }) 283 284 def get_write_buffer_limits(self): 285 return (self._low_water, self._high_water) 286 287 def _set_write_buffer_limits(self, high=None, low=None): 288 if high is None: 289 if low is None: 290 high = 64*1024 291 else: 292 high = 4*low 293 if low is None: 294 low = high // 4 295 if not high >= low >= 0: 296 raise ValueError('high (%r) must be >= low (%r) must be >= 0' % 297 (high, low)) 298 self._high_water = high 299 self._low_water = low 300 301 def set_write_buffer_limits(self, high=None, low=None): 302 self._set_write_buffer_limits(high=high, low=low) 303 self._maybe_pause_protocol() 304 305 def get_write_buffer_size(self): 306 raise NotImplementedError 307