1"""Abstract Transport class.""" 2 3__all__ = ( 4 'BaseTransport', 'ReadTransport', 'WriteTransport', 5 'Transport', 'DatagramTransport', 'SubprocessTransport', 6) 7 8 9class BaseTransport: 10 """Base class for transports.""" 11 12 def __init__(self, extra=None): 13 if extra is None: 14 extra = {} 15 self._extra = extra 16 17 def get_extra_info(self, name, default=None): 18 """Get optional transport information.""" 19 return self._extra.get(name, default) 20 21 def is_closing(self): 22 """Return True if the transport is closing or closed.""" 23 raise NotImplementedError 24 25 def close(self): 26 """Close the transport. 27 28 Buffered data will be flushed asynchronously. No more data 29 will be received. After all buffered data is flushed, the 30 protocol's connection_lost() method will (eventually) called 31 with None as its argument. 32 """ 33 raise NotImplementedError 34 35 def set_protocol(self, protocol): 36 """Set a new protocol.""" 37 raise NotImplementedError 38 39 def get_protocol(self): 40 """Return the current protocol.""" 41 raise NotImplementedError 42 43 44class ReadTransport(BaseTransport): 45 """Interface for read-only transports.""" 46 47 def is_reading(self): 48 """Return True if the transport is receiving.""" 49 raise NotImplementedError 50 51 def pause_reading(self): 52 """Pause the receiving end. 53 54 No data will be passed to the protocol's data_received() 55 method until resume_reading() is called. 56 """ 57 raise NotImplementedError 58 59 def resume_reading(self): 60 """Resume the receiving end. 61 62 Data received will once again be passed to the protocol's 63 data_received() method. 64 """ 65 raise NotImplementedError 66 67 68class WriteTransport(BaseTransport): 69 """Interface for write-only transports.""" 70 71 def set_write_buffer_limits(self, high=None, low=None): 72 """Set the high- and low-water limits for write flow control. 73 74 These two values control when to call the protocol's 75 pause_writing() and resume_writing() methods. If specified, 76 the low-water limit must be less than or equal to the 77 high-water limit. Neither value can be negative. 78 79 The defaults are implementation-specific. If only the 80 high-water limit is given, the low-water limit defaults to an 81 implementation-specific value less than or equal to the 82 high-water limit. Setting high to zero forces low to zero as 83 well, and causes pause_writing() to be called whenever the 84 buffer becomes non-empty. Setting low to zero causes 85 resume_writing() to be called only once the buffer is empty. 86 Use of zero for either limit is generally sub-optimal as it 87 reduces opportunities for doing I/O and computation 88 concurrently. 89 """ 90 raise NotImplementedError 91 92 def get_write_buffer_size(self): 93 """Return the current size of the write buffer.""" 94 raise NotImplementedError 95 96 def write(self, data): 97 """Write some data bytes to the transport. 98 99 This does not block; it buffers the data and arranges for it 100 to be sent out asynchronously. 101 """ 102 raise NotImplementedError 103 104 def writelines(self, list_of_data): 105 """Write a list (or any iterable) of data bytes to the transport. 106 107 The default implementation concatenates the arguments and 108 calls write() on the result. 109 """ 110 data = b''.join(list_of_data) 111 self.write(data) 112 113 def write_eof(self): 114 """Close the write end after flushing buffered data. 115 116 (This is like typing ^D into a UNIX program reading from stdin.) 117 118 Data may still be received. 119 """ 120 raise NotImplementedError 121 122 def can_write_eof(self): 123 """Return True if this transport supports write_eof(), False if not.""" 124 raise NotImplementedError 125 126 def abort(self): 127 """Close the transport immediately. 128 129 Buffered data will be lost. No more data will be received. 130 The protocol's connection_lost() method will (eventually) be 131 called with None as its argument. 132 """ 133 raise NotImplementedError 134 135 136class Transport(ReadTransport, WriteTransport): 137 """Interface representing a bidirectional transport. 138 139 There may be several implementations, but typically, the user does 140 not implement new transports; rather, the platform provides some 141 useful transports that are implemented using the platform's best 142 practices. 143 144 The user never instantiates a transport directly; they call a 145 utility function, passing it a protocol factory and other 146 information necessary to create the transport and protocol. (E.g. 147 EventLoop.create_connection() or EventLoop.create_server().) 148 149 The utility function will asynchronously create a transport and a 150 protocol and hook them up by calling the protocol's 151 connection_made() method, passing it the transport. 152 153 The implementation here raises NotImplemented for every method 154 except writelines(), which calls write() in a loop. 155 """ 156 157 158class DatagramTransport(BaseTransport): 159 """Interface for datagram (UDP) transports.""" 160 161 def sendto(self, data, addr=None): 162 """Send data to the transport. 163 164 This does not block; it buffers the data and arranges for it 165 to be sent out asynchronously. 166 addr is target socket address. 167 If addr is None use target address pointed on transport creation. 168 """ 169 raise NotImplementedError 170 171 def abort(self): 172 """Close the transport immediately. 173 174 Buffered data will be lost. No more data will be received. 175 The protocol's connection_lost() method will (eventually) be 176 called with None as its argument. 177 """ 178 raise NotImplementedError 179 180 181class SubprocessTransport(BaseTransport): 182 183 def get_pid(self): 184 """Get subprocess id.""" 185 raise NotImplementedError 186 187 def get_returncode(self): 188 """Get subprocess returncode. 189 190 See also 191 http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode 192 """ 193 raise NotImplementedError 194 195 def get_pipe_transport(self, fd): 196 """Get transport for pipe with number fd.""" 197 raise NotImplementedError 198 199 def send_signal(self, signal): 200 """Send signal to subprocess. 201 202 See also: 203 docs.python.org/3/library/subprocess#subprocess.Popen.send_signal 204 """ 205 raise NotImplementedError 206 207 def terminate(self): 208 """Stop the subprocess. 209 210 Alias for close() method. 211 212 On Posix OSs the method sends SIGTERM to the subprocess. 213 On Windows the Win32 API function TerminateProcess() 214 is called to stop the subprocess. 215 216 See also: 217 http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate 218 """ 219 raise NotImplementedError 220 221 def kill(self): 222 """Kill the subprocess. 223 224 On Posix OSs the function sends SIGKILL to the subprocess. 225 On Windows kill() is an alias for terminate(). 226 227 See also: 228 http://docs.python.org/3/library/subprocess#subprocess.Popen.kill 229 """ 230 raise NotImplementedError 231 232 233class _FlowControlMixin(Transport): 234 """All the logic for (write) flow control in a mix-in base class. 235 236 The subclass must implement get_write_buffer_size(). It must call 237 _maybe_pause_protocol() whenever the write buffer size increases, 238 and _maybe_resume_protocol() whenever it decreases. It may also 239 override set_write_buffer_limits() (e.g. to specify different 240 defaults). 241 242 The subclass constructor must call super().__init__(extra). This 243 will call set_write_buffer_limits(). 244 245 The user may call set_write_buffer_limits() and 246 get_write_buffer_size(), and their protocol's pause_writing() and 247 resume_writing() may be called. 248 """ 249 250 def __init__(self, extra=None, loop=None): 251 super().__init__(extra) 252 assert loop is not None 253 self._loop = loop 254 self._protocol_paused = False 255 self._set_write_buffer_limits() 256 257 def _maybe_pause_protocol(self): 258 size = self.get_write_buffer_size() 259 if size <= self._high_water: 260 return 261 if not self._protocol_paused: 262 self._protocol_paused = True 263 try: 264 self._protocol.pause_writing() 265 except Exception as exc: 266 self._loop.call_exception_handler({ 267 'message': 'protocol.pause_writing() failed', 268 'exception': exc, 269 'transport': self, 270 'protocol': self._protocol, 271 }) 272 273 def _maybe_resume_protocol(self): 274 if (self._protocol_paused and 275 self.get_write_buffer_size() <= self._low_water): 276 self._protocol_paused = False 277 try: 278 self._protocol.resume_writing() 279 except Exception as exc: 280 self._loop.call_exception_handler({ 281 'message': 'protocol.resume_writing() failed', 282 'exception': exc, 283 'transport': self, 284 'protocol': self._protocol, 285 }) 286 287 def get_write_buffer_limits(self): 288 return (self._low_water, self._high_water) 289 290 def _set_write_buffer_limits(self, high=None, low=None): 291 if high is None: 292 if low is None: 293 high = 64 * 1024 294 else: 295 high = 4 * low 296 if low is None: 297 low = high // 4 298 299 if not high >= low >= 0: 300 raise ValueError( 301 f'high ({high!r}) must be >= low ({low!r}) must be >= 0') 302 303 self._high_water = high 304 self._low_water = low 305 306 def set_write_buffer_limits(self, high=None, low=None): 307 self._set_write_buffer_limits(high=high, low=low) 308 self._maybe_pause_protocol() 309 310 def get_write_buffer_size(self): 311 raise NotImplementedError 312