• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Abstract Transport class."""
2
3__all__ = (
4    'BaseTransport', 'ReadTransport', 'WriteTransport',
5    'Transport', 'DatagramTransport', 'SubprocessTransport',
6)
7
8
9class BaseTransport:
10    """Base class for transports."""
11
12    def __init__(self, extra=None):
13        if extra is None:
14            extra = {}
15        self._extra = extra
16
17    def get_extra_info(self, name, default=None):
18        """Get optional transport information."""
19        return self._extra.get(name, default)
20
21    def is_closing(self):
22        """Return True if the transport is closing or closed."""
23        raise NotImplementedError
24
25    def close(self):
26        """Close the transport.
27
28        Buffered data will be flushed asynchronously.  No more data
29        will be received.  After all buffered data is flushed, the
30        protocol's connection_lost() method will (eventually) called
31        with None as its argument.
32        """
33        raise NotImplementedError
34
35    def set_protocol(self, protocol):
36        """Set a new protocol."""
37        raise NotImplementedError
38
39    def get_protocol(self):
40        """Return the current protocol."""
41        raise NotImplementedError
42
43
44class ReadTransport(BaseTransport):
45    """Interface for read-only transports."""
46
47    def is_reading(self):
48        """Return True if the transport is receiving."""
49        raise NotImplementedError
50
51    def pause_reading(self):
52        """Pause the receiving end.
53
54        No data will be passed to the protocol's data_received()
55        method until resume_reading() is called.
56        """
57        raise NotImplementedError
58
59    def resume_reading(self):
60        """Resume the receiving end.
61
62        Data received will once again be passed to the protocol's
63        data_received() method.
64        """
65        raise NotImplementedError
66
67
68class WriteTransport(BaseTransport):
69    """Interface for write-only transports."""
70
71    def set_write_buffer_limits(self, high=None, low=None):
72        """Set the high- and low-water limits for write flow control.
73
74        These two values control when to call the protocol's
75        pause_writing() and resume_writing() methods.  If specified,
76        the low-water limit must be less than or equal to the
77        high-water limit.  Neither value can be negative.
78
79        The defaults are implementation-specific.  If only the
80        high-water limit is given, the low-water limit defaults to an
81        implementation-specific value less than or equal to the
82        high-water limit.  Setting high to zero forces low to zero as
83        well, and causes pause_writing() to be called whenever the
84        buffer becomes non-empty.  Setting low to zero causes
85        resume_writing() to be called only once the buffer is empty.
86        Use of zero for either limit is generally sub-optimal as it
87        reduces opportunities for doing I/O and computation
88        concurrently.
89        """
90        raise NotImplementedError
91
92    def get_write_buffer_size(self):
93        """Return the current size of the write buffer."""
94        raise NotImplementedError
95
96    def write(self, data):
97        """Write some data bytes to the transport.
98
99        This does not block; it buffers the data and arranges for it
100        to be sent out asynchronously.
101        """
102        raise NotImplementedError
103
104    def writelines(self, list_of_data):
105        """Write a list (or any iterable) of data bytes to the transport.
106
107        The default implementation concatenates the arguments and
108        calls write() on the result.
109        """
110        data = b''.join(list_of_data)
111        self.write(data)
112
113    def write_eof(self):
114        """Close the write end after flushing buffered data.
115
116        (This is like typing ^D into a UNIX program reading from stdin.)
117
118        Data may still be received.
119        """
120        raise NotImplementedError
121
122    def can_write_eof(self):
123        """Return True if this transport supports write_eof(), False if not."""
124        raise NotImplementedError
125
126    def abort(self):
127        """Close the transport immediately.
128
129        Buffered data will be lost.  No more data will be received.
130        The protocol's connection_lost() method will (eventually) be
131        called with None as its argument.
132        """
133        raise NotImplementedError
134
135
136class Transport(ReadTransport, WriteTransport):
137    """Interface representing a bidirectional transport.
138
139    There may be several implementations, but typically, the user does
140    not implement new transports; rather, the platform provides some
141    useful transports that are implemented using the platform's best
142    practices.
143
144    The user never instantiates a transport directly; they call a
145    utility function, passing it a protocol factory and other
146    information necessary to create the transport and protocol.  (E.g.
147    EventLoop.create_connection() or EventLoop.create_server().)
148
149    The utility function will asynchronously create a transport and a
150    protocol and hook them up by calling the protocol's
151    connection_made() method, passing it the transport.
152
153    The implementation here raises NotImplemented for every method
154    except writelines(), which calls write() in a loop.
155    """
156
157
158class DatagramTransport(BaseTransport):
159    """Interface for datagram (UDP) transports."""
160
161    def sendto(self, data, addr=None):
162        """Send data to the transport.
163
164        This does not block; it buffers the data and arranges for it
165        to be sent out asynchronously.
166        addr is target socket address.
167        If addr is None use target address pointed on transport creation.
168        """
169        raise NotImplementedError
170
171    def abort(self):
172        """Close the transport immediately.
173
174        Buffered data will be lost.  No more data will be received.
175        The protocol's connection_lost() method will (eventually) be
176        called with None as its argument.
177        """
178        raise NotImplementedError
179
180
181class SubprocessTransport(BaseTransport):
182
183    def get_pid(self):
184        """Get subprocess id."""
185        raise NotImplementedError
186
187    def get_returncode(self):
188        """Get subprocess returncode.
189
190        See also
191        http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode
192        """
193        raise NotImplementedError
194
195    def get_pipe_transport(self, fd):
196        """Get transport for pipe with number fd."""
197        raise NotImplementedError
198
199    def send_signal(self, signal):
200        """Send signal to subprocess.
201
202        See also:
203        docs.python.org/3/library/subprocess#subprocess.Popen.send_signal
204        """
205        raise NotImplementedError
206
207    def terminate(self):
208        """Stop the subprocess.
209
210        Alias for close() method.
211
212        On Posix OSs the method sends SIGTERM to the subprocess.
213        On Windows the Win32 API function TerminateProcess()
214         is called to stop the subprocess.
215
216        See also:
217        http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate
218        """
219        raise NotImplementedError
220
221    def kill(self):
222        """Kill the subprocess.
223
224        On Posix OSs the function sends SIGKILL to the subprocess.
225        On Windows kill() is an alias for terminate().
226
227        See also:
228        http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
229        """
230        raise NotImplementedError
231
232
233class _FlowControlMixin(Transport):
234    """All the logic for (write) flow control in a mix-in base class.
235
236    The subclass must implement get_write_buffer_size().  It must call
237    _maybe_pause_protocol() whenever the write buffer size increases,
238    and _maybe_resume_protocol() whenever it decreases.  It may also
239    override set_write_buffer_limits() (e.g. to specify different
240    defaults).
241
242    The subclass constructor must call super().__init__(extra).  This
243    will call set_write_buffer_limits().
244
245    The user may call set_write_buffer_limits() and
246    get_write_buffer_size(), and their protocol's pause_writing() and
247    resume_writing() may be called.
248    """
249
250    def __init__(self, extra=None, loop=None):
251        super().__init__(extra)
252        assert loop is not None
253        self._loop = loop
254        self._protocol_paused = False
255        self._set_write_buffer_limits()
256
257    def _maybe_pause_protocol(self):
258        size = self.get_write_buffer_size()
259        if size <= self._high_water:
260            return
261        if not self._protocol_paused:
262            self._protocol_paused = True
263            try:
264                self._protocol.pause_writing()
265            except Exception as exc:
266                self._loop.call_exception_handler({
267                    'message': 'protocol.pause_writing() failed',
268                    'exception': exc,
269                    'transport': self,
270                    'protocol': self._protocol,
271                })
272
273    def _maybe_resume_protocol(self):
274        if (self._protocol_paused and
275                self.get_write_buffer_size() <= self._low_water):
276            self._protocol_paused = False
277            try:
278                self._protocol.resume_writing()
279            except Exception as exc:
280                self._loop.call_exception_handler({
281                    'message': 'protocol.resume_writing() failed',
282                    'exception': exc,
283                    'transport': self,
284                    'protocol': self._protocol,
285                })
286
287    def get_write_buffer_limits(self):
288        return (self._low_water, self._high_water)
289
290    def _set_write_buffer_limits(self, high=None, low=None):
291        if high is None:
292            if low is None:
293                high = 64 * 1024
294            else:
295                high = 4 * low
296        if low is None:
297            low = high // 4
298
299        if not high >= low >= 0:
300            raise ValueError(
301                f'high ({high!r}) must be >= low ({low!r}) must be >= 0')
302
303        self._high_water = high
304        self._low_water = low
305
306    def set_write_buffer_limits(self, high=None, low=None):
307        self._set_write_buffer_limits(high=high, low=low)
308        self._maybe_pause_protocol()
309
310    def get_write_buffer_size(self):
311        raise NotImplementedError
312