• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Abstract Transport class."""
2
3from asyncio import compat
4
5__all__ = ['BaseTransport', 'ReadTransport', 'WriteTransport',
6           'Transport', 'DatagramTransport', 'SubprocessTransport',
7           ]
8
9
10class BaseTransport:
11    """Base class for transports."""
12
13    def __init__(self, extra=None):
14        if extra is None:
15            extra = {}
16        self._extra = extra
17
18    def get_extra_info(self, name, default=None):
19        """Get optional transport information."""
20        return self._extra.get(name, default)
21
22    def is_closing(self):
23        """Return True if the transport is closing or closed."""
24        raise NotImplementedError
25
26    def close(self):
27        """Close the transport.
28
29        Buffered data will be flushed asynchronously.  No more data
30        will be received.  After all buffered data is flushed, the
31        protocol's connection_lost() method will (eventually) called
32        with None as its argument.
33        """
34        raise NotImplementedError
35
36    def set_protocol(self, protocol):
37        """Set a new protocol."""
38        raise NotImplementedError
39
40    def get_protocol(self):
41        """Return the current protocol."""
42        raise NotImplementedError
43
44
45class ReadTransport(BaseTransport):
46    """Interface for read-only transports."""
47
48    def pause_reading(self):
49        """Pause the receiving end.
50
51        No data will be passed to the protocol's data_received()
52        method until resume_reading() is called.
53        """
54        raise NotImplementedError
55
56    def resume_reading(self):
57        """Resume the receiving end.
58
59        Data received will once again be passed to the protocol's
60        data_received() method.
61        """
62        raise NotImplementedError
63
64
65class WriteTransport(BaseTransport):
66    """Interface for write-only transports."""
67
68    def set_write_buffer_limits(self, high=None, low=None):
69        """Set the high- and low-water limits for write flow control.
70
71        These two values control when to call the protocol's
72        pause_writing() and resume_writing() methods.  If specified,
73        the low-water limit must be less than or equal to the
74        high-water limit.  Neither value can be negative.
75
76        The defaults are implementation-specific.  If only the
77        high-water limit is given, the low-water limit defaults to an
78        implementation-specific value less than or equal to the
79        high-water limit.  Setting high to zero forces low to zero as
80        well, and causes pause_writing() to be called whenever the
81        buffer becomes non-empty.  Setting low to zero causes
82        resume_writing() to be called only once the buffer is empty.
83        Use of zero for either limit is generally sub-optimal as it
84        reduces opportunities for doing I/O and computation
85        concurrently.
86        """
87        raise NotImplementedError
88
89    def get_write_buffer_size(self):
90        """Return the current size of the write buffer."""
91        raise NotImplementedError
92
93    def write(self, data):
94        """Write some data bytes to the transport.
95
96        This does not block; it buffers the data and arranges for it
97        to be sent out asynchronously.
98        """
99        raise NotImplementedError
100
101    def writelines(self, list_of_data):
102        """Write a list (or any iterable) of data bytes to the transport.
103
104        The default implementation concatenates the arguments and
105        calls write() on the result.
106        """
107        data = compat.flatten_list_bytes(list_of_data)
108        self.write(data)
109
110    def write_eof(self):
111        """Close the write end after flushing buffered data.
112
113        (This is like typing ^D into a UNIX program reading from stdin.)
114
115        Data may still be received.
116        """
117        raise NotImplementedError
118
119    def can_write_eof(self):
120        """Return True if this transport supports write_eof(), False if not."""
121        raise NotImplementedError
122
123    def abort(self):
124        """Close the transport immediately.
125
126        Buffered data will be lost.  No more data will be received.
127        The protocol's connection_lost() method will (eventually) be
128        called with None as its argument.
129        """
130        raise NotImplementedError
131
132
133class Transport(ReadTransport, WriteTransport):
134    """Interface representing a bidirectional transport.
135
136    There may be several implementations, but typically, the user does
137    not implement new transports; rather, the platform provides some
138    useful transports that are implemented using the platform's best
139    practices.
140
141    The user never instantiates a transport directly; they call a
142    utility function, passing it a protocol factory and other
143    information necessary to create the transport and protocol.  (E.g.
144    EventLoop.create_connection() or EventLoop.create_server().)
145
146    The utility function will asynchronously create a transport and a
147    protocol and hook them up by calling the protocol's
148    connection_made() method, passing it the transport.
149
150    The implementation here raises NotImplemented for every method
151    except writelines(), which calls write() in a loop.
152    """
153
154
155class DatagramTransport(BaseTransport):
156    """Interface for datagram (UDP) transports."""
157
158    def sendto(self, data, addr=None):
159        """Send data to the transport.
160
161        This does not block; it buffers the data and arranges for it
162        to be sent out asynchronously.
163        addr is target socket address.
164        If addr is None use target address pointed on transport creation.
165        """
166        raise NotImplementedError
167
168    def abort(self):
169        """Close the transport immediately.
170
171        Buffered data will be lost.  No more data will be received.
172        The protocol's connection_lost() method will (eventually) be
173        called with None as its argument.
174        """
175        raise NotImplementedError
176
177
178class SubprocessTransport(BaseTransport):
179
180    def get_pid(self):
181        """Get subprocess id."""
182        raise NotImplementedError
183
184    def get_returncode(self):
185        """Get subprocess returncode.
186
187        See also
188        http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode
189        """
190        raise NotImplementedError
191
192    def get_pipe_transport(self, fd):
193        """Get transport for pipe with number fd."""
194        raise NotImplementedError
195
196    def send_signal(self, signal):
197        """Send signal to subprocess.
198
199        See also:
200        docs.python.org/3/library/subprocess#subprocess.Popen.send_signal
201        """
202        raise NotImplementedError
203
204    def terminate(self):
205        """Stop the subprocess.
206
207        Alias for close() method.
208
209        On Posix OSs the method sends SIGTERM to the subprocess.
210        On Windows the Win32 API function TerminateProcess()
211         is called to stop the subprocess.
212
213        See also:
214        http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate
215        """
216        raise NotImplementedError
217
218    def kill(self):
219        """Kill the subprocess.
220
221        On Posix OSs the function sends SIGKILL to the subprocess.
222        On Windows kill() is an alias for terminate().
223
224        See also:
225        http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
226        """
227        raise NotImplementedError
228
229
230class _FlowControlMixin(Transport):
231    """All the logic for (write) flow control in a mix-in base class.
232
233    The subclass must implement get_write_buffer_size().  It must call
234    _maybe_pause_protocol() whenever the write buffer size increases,
235    and _maybe_resume_protocol() whenever it decreases.  It may also
236    override set_write_buffer_limits() (e.g. to specify different
237    defaults).
238
239    The subclass constructor must call super().__init__(extra).  This
240    will call set_write_buffer_limits().
241
242    The user may call set_write_buffer_limits() and
243    get_write_buffer_size(), and their protocol's pause_writing() and
244    resume_writing() may be called.
245    """
246
247    def __init__(self, extra=None, loop=None):
248        super().__init__(extra)
249        assert loop is not None
250        self._loop = loop
251        self._protocol_paused = False
252        self._set_write_buffer_limits()
253
254    def _maybe_pause_protocol(self):
255        size = self.get_write_buffer_size()
256        if size <= self._high_water:
257            return
258        if not self._protocol_paused:
259            self._protocol_paused = True
260            try:
261                self._protocol.pause_writing()
262            except Exception as exc:
263                self._loop.call_exception_handler({
264                    'message': 'protocol.pause_writing() failed',
265                    'exception': exc,
266                    'transport': self,
267                    'protocol': self._protocol,
268                })
269
270    def _maybe_resume_protocol(self):
271        if (self._protocol_paused and
272            self.get_write_buffer_size() <= self._low_water):
273            self._protocol_paused = False
274            try:
275                self._protocol.resume_writing()
276            except Exception as exc:
277                self._loop.call_exception_handler({
278                    'message': 'protocol.resume_writing() failed',
279                    'exception': exc,
280                    'transport': self,
281                    'protocol': self._protocol,
282                })
283
284    def get_write_buffer_limits(self):
285        return (self._low_water, self._high_water)
286
287    def _set_write_buffer_limits(self, high=None, low=None):
288        if high is None:
289            if low is None:
290                high = 64*1024
291            else:
292                high = 4*low
293        if low is None:
294            low = high // 4
295        if not high >= low >= 0:
296            raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
297                             (high, low))
298        self._high_water = high
299        self._low_water = low
300
301    def set_write_buffer_limits(self, high=None, low=None):
302        self._set_write_buffer_limits(high=high, low=low)
303        self._maybe_pause_protocol()
304
305    def get_write_buffer_size(self):
306        raise NotImplementedError
307