• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1.. currentmodule:: asyncio
2
3
4.. _asyncio-transports-protocols:
5
6
7========================
8Transports and Protocols
9========================
10
11.. rubric:: Preface
12
13Transports and Protocols are used by the **low-level** event loop
14APIs such as :meth:`loop.create_connection`.  They use
15callback-based programming style and enable high-performance
16implementations of network or IPC protocols (e.g. HTTP).
17
18Essentially, transports and protocols should only be used in
19libraries and frameworks and never in high-level asyncio
20applications.
21
22This documentation page covers both `Transports`_ and `Protocols`_.
23
24.. rubric:: Introduction
25
26At the highest level, the transport is concerned with *how* bytes
27are transmitted, while the protocol determines *which* bytes to
28transmit (and to some extent when).
29
30A different way of saying the same thing: a transport is an
31abstraction for a socket (or similar I/O endpoint) while a protocol
32is an abstraction for an application, from the transport's point
33of view.
34
35Yet another view is the transport and protocol interfaces
36together define an abstract interface for using network I/O and
37interprocess I/O.
38
39There is always a 1:1 relationship between transport and protocol
40objects: the protocol calls transport methods to send data,
41while the transport calls protocol methods to pass it data that
42has been received.
43
44Most of connection oriented event loop methods
45(such as :meth:`loop.create_connection`) usually accept a
46*protocol_factory* argument used to create a *Protocol* object
47for an accepted connection, represented by a *Transport* object.
48Such methods usually return a tuple of ``(transport, protocol)``.
49
50.. rubric:: Contents
51
52This documentation page contains the following sections:
53
54* The `Transports`_ section documents asyncio :class:`BaseTransport`,
55  :class:`ReadTransport`, :class:`WriteTransport`, :class:`Transport`,
56  :class:`DatagramTransport`, and :class:`SubprocessTransport`
57  classes.
58
59* The `Protocols`_ section documents asyncio :class:`BaseProtocol`,
60  :class:`Protocol`, :class:`BufferedProtocol`,
61  :class:`DatagramProtocol`, and :class:`SubprocessProtocol` classes.
62
63* The `Examples`_ section showcases how to work with transports,
64  protocols, and low-level event loop APIs.
65
66
67.. _asyncio-transport:
68
69Transports
70==========
71
72**Source code:** :source:`Lib/asyncio/transports.py`
73
74----------------------------------------------------
75
76Transports are classes provided by :mod:`asyncio` in order to abstract
77various kinds of communication channels.
78
79Transport objects are always instantiated by an
80:ref:`asyncio event loop <asyncio-event-loop>`.
81
82asyncio implements transports for TCP, UDP, SSL, and subprocess pipes.
83The methods available on a transport depend on the transport's kind.
84
85The transport classes are :ref:`not thread safe <asyncio-multithreading>`.
86
87
88Transports Hierarchy
89--------------------
90
91.. class:: BaseTransport
92
93   Base class for all transports.  Contains methods that all
94   asyncio transports share.
95
96.. class:: WriteTransport(BaseTransport)
97
98   A base transport for write-only connections.
99
100   Instances of the *WriteTransport* class are returned from
101   the :meth:`loop.connect_write_pipe` event loop method and
102   are also used by subprocess-related methods like
103   :meth:`loop.subprocess_exec`.
104
105.. class:: ReadTransport(BaseTransport)
106
107   A base transport for read-only connections.
108
109   Instances of the *ReadTransport* class are returned from
110   the :meth:`loop.connect_read_pipe` event loop method and
111   are also used by subprocess-related methods like
112   :meth:`loop.subprocess_exec`.
113
114.. class:: Transport(WriteTransport, ReadTransport)
115
116   Interface representing a bidirectional transport, such as a
117   TCP connection.
118
119   The user does not instantiate a transport directly; they call a
120   utility function, passing it a protocol factory and other
121   information necessary to create the transport and protocol.
122
123   Instances of the *Transport* class are returned from or used by
124   event loop methods like :meth:`loop.create_connection`,
125   :meth:`loop.create_unix_connection`,
126   :meth:`loop.create_server`, :meth:`loop.sendfile`, etc.
127
128
129.. class:: DatagramTransport(BaseTransport)
130
131   A transport for datagram (UDP) connections.
132
133   Instances of the *DatagramTransport* class are returned from
134   the :meth:`loop.create_datagram_endpoint` event loop method.
135
136
137.. class:: SubprocessTransport(BaseTransport)
138
139   An abstraction to represent a connection between a parent and its
140   child OS process.
141
142   Instances of the *SubprocessTransport* class are returned from
143   event loop methods :meth:`loop.subprocess_shell` and
144   :meth:`loop.subprocess_exec`.
145
146
147Base Transport
148--------------
149
150.. method:: BaseTransport.close()
151
152   Close the transport.
153
154   If the transport has a buffer for outgoing
155   data, buffered data will be flushed asynchronously.  No more data
156   will be received.  After all buffered data is flushed, the
157   protocol's :meth:`protocol.connection_lost()
158   <BaseProtocol.connection_lost>` method will be called with
159   :const:`None` as its argument. The transport should not be
160   used once it is closed.
161
162.. method:: BaseTransport.is_closing()
163
164   Return ``True`` if the transport is closing or is closed.
165
166.. method:: BaseTransport.get_extra_info(name, default=None)
167
168   Return information about the transport or underlying resources
169   it uses.
170
171   *name* is a string representing the piece of transport-specific
172   information to get.
173
174   *default* is the value to return if the information is not
175   available, or if the transport does not support querying it
176   with the given third-party event loop implementation or on the
177   current platform.
178
179   For example, the following code attempts to get the underlying
180   socket object of the transport::
181
182      sock = transport.get_extra_info('socket')
183      if sock is not None:
184          print(sock.getsockopt(...))
185
186   Categories of information that can be queried on some transports:
187
188   * socket:
189
190     - ``'peername'``: the remote address to which the socket is
191       connected, result of :meth:`socket.socket.getpeername`
192       (``None`` on error)
193
194     - ``'socket'``: :class:`socket.socket` instance
195
196     - ``'sockname'``: the socket's own address,
197       result of :meth:`socket.socket.getsockname`
198
199   * SSL socket:
200
201     - ``'compression'``: the compression algorithm being used as a
202       string, or ``None`` if the connection isn't compressed; result
203       of :meth:`ssl.SSLSocket.compression`
204
205     - ``'cipher'``: a three-value tuple containing the name of the
206       cipher being used, the version of the SSL protocol that defines
207       its use, and the number of secret bits being used; result of
208       :meth:`ssl.SSLSocket.cipher`
209
210     - ``'peercert'``: peer certificate; result of
211       :meth:`ssl.SSLSocket.getpeercert`
212
213     - ``'sslcontext'``: :class:`ssl.SSLContext` instance
214
215     - ``'ssl_object'``: :class:`ssl.SSLObject` or
216       :class:`ssl.SSLSocket` instance
217
218   * pipe:
219
220     - ``'pipe'``: pipe object
221
222   * subprocess:
223
224     - ``'subprocess'``: :class:`subprocess.Popen` instance
225
226.. method:: BaseTransport.set_protocol(protocol)
227
228   Set a new protocol.
229
230   Switching protocol should only be done when both
231   protocols are documented to support the switch.
232
233.. method:: BaseTransport.get_protocol()
234
235   Return the current protocol.
236
237
238Read-only Transports
239--------------------
240
241.. method:: ReadTransport.is_reading()
242
243   Return ``True`` if the transport is receiving new data.
244
245   .. versionadded:: 3.7
246
247.. method:: ReadTransport.pause_reading()
248
249   Pause the receiving end of the transport.  No data will be passed to
250   the protocol's :meth:`protocol.data_received() <Protocol.data_received>`
251   method until :meth:`resume_reading` is called.
252
253   .. versionchanged:: 3.7
254      The method is idempotent, i.e. it can be called when the
255      transport is already paused or closed.
256
257.. method:: ReadTransport.resume_reading()
258
259   Resume the receiving end.  The protocol's
260   :meth:`protocol.data_received() <Protocol.data_received>` method
261   will be called once again if some data is available for reading.
262
263   .. versionchanged:: 3.7
264      The method is idempotent, i.e. it can be called when the
265      transport is already reading.
266
267
268Write-only Transports
269---------------------
270
271.. method:: WriteTransport.abort()
272
273   Close the transport immediately, without waiting for pending operations
274   to complete.  Buffered data will be lost.  No more data will be received.
275   The protocol's :meth:`protocol.connection_lost()
276   <BaseProtocol.connection_lost>` method will eventually be
277   called with :const:`None` as its argument.
278
279.. method:: WriteTransport.can_write_eof()
280
281   Return :const:`True` if the transport supports
282   :meth:`~WriteTransport.write_eof`, :const:`False` if not.
283
284.. method:: WriteTransport.get_write_buffer_size()
285
286   Return the current size of the output buffer used by the transport.
287
288.. method:: WriteTransport.get_write_buffer_limits()
289
290   Get the *high* and *low* watermarks for write flow control. Return a
291   tuple ``(low, high)`` where *low* and *high* are positive number of
292   bytes.
293
294   Use :meth:`set_write_buffer_limits` to set the limits.
295
296   .. versionadded:: 3.4.2
297
298.. method:: WriteTransport.set_write_buffer_limits(high=None, low=None)
299
300   Set the *high* and *low* watermarks for write flow control.
301
302   These two values (measured in number of
303   bytes) control when the protocol's
304   :meth:`protocol.pause_writing() <BaseProtocol.pause_writing>`
305   and :meth:`protocol.resume_writing() <BaseProtocol.resume_writing>`
306   methods are called. If specified, the low watermark must be less
307   than or equal to the high watermark.  Neither *high* nor *low*
308   can be negative.
309
310   :meth:`~BaseProtocol.pause_writing` is called when the buffer size
311   becomes greater than or equal to the *high* value. If writing has
312   been paused, :meth:`~BaseProtocol.resume_writing` is called when
313   the buffer size becomes less than or equal to the *low* value.
314
315   The defaults are implementation-specific.  If only the
316   high watermark is given, the low watermark defaults to an
317   implementation-specific value less than or equal to the
318   high watermark.  Setting *high* to zero forces *low* to zero as
319   well, and causes :meth:`~BaseProtocol.pause_writing` to be called
320   whenever the buffer becomes non-empty.  Setting *low* to zero causes
321   :meth:`~BaseProtocol.resume_writing` to be called only once the
322   buffer is empty. Use of zero for either limit is generally
323   sub-optimal as it reduces opportunities for doing I/O and
324   computation concurrently.
325
326   Use :meth:`~WriteTransport.get_write_buffer_limits`
327   to get the limits.
328
329.. method:: WriteTransport.write(data)
330
331   Write some *data* bytes to the transport.
332
333   This method does not block; it buffers the data and arranges for it
334   to be sent out asynchronously.
335
336.. method:: WriteTransport.writelines(list_of_data)
337
338   Write a list (or any iterable) of data bytes to the transport.
339   This is functionally equivalent to calling :meth:`write` on each
340   element yielded by the iterable, but may be implemented more
341   efficiently.
342
343.. method:: WriteTransport.write_eof()
344
345   Close the write end of the transport after flushing all buffered data.
346   Data may still be received.
347
348   This method can raise :exc:`NotImplementedError` if the transport
349   (e.g. SSL) doesn't support half-closed connections.
350
351
352Datagram Transports
353-------------------
354
355.. method:: DatagramTransport.sendto(data, addr=None)
356
357   Send the *data* bytes to the remote peer given by *addr* (a
358   transport-dependent target address).  If *addr* is :const:`None`,
359   the data is sent to the target address given on transport
360   creation.
361
362   This method does not block; it buffers the data and arranges
363   for it to be sent out asynchronously.
364
365   .. versionchanged:: 3.13
366      This method can be called with an empty bytes object to send a
367      zero-length datagram. The buffer size calculation used for flow
368      control is also updated to account for the datagram header.
369
370.. method:: DatagramTransport.abort()
371
372   Close the transport immediately, without waiting for pending
373   operations to complete.  Buffered data will be lost.
374   No more data will be received.  The protocol's
375   :meth:`protocol.connection_lost() <BaseProtocol.connection_lost>`
376   method will eventually be called with :const:`None` as its argument.
377
378
379.. _asyncio-subprocess-transports:
380
381Subprocess Transports
382---------------------
383
384.. method:: SubprocessTransport.get_pid()
385
386   Return the subprocess process id as an integer.
387
388.. method:: SubprocessTransport.get_pipe_transport(fd)
389
390   Return the transport for the communication pipe corresponding to the
391   integer file descriptor *fd*:
392
393   * ``0``: readable streaming transport of the standard input (*stdin*),
394     or :const:`None` if the subprocess was not created with ``stdin=PIPE``
395   * ``1``: writable streaming transport of the standard output (*stdout*),
396     or :const:`None` if the subprocess was not created with ``stdout=PIPE``
397   * ``2``: writable streaming transport of the standard error (*stderr*),
398     or :const:`None` if the subprocess was not created with ``stderr=PIPE``
399   * other *fd*: :const:`None`
400
401.. method:: SubprocessTransport.get_returncode()
402
403   Return the subprocess return code as an integer or :const:`None`
404   if it hasn't returned, which is similar to the
405   :attr:`subprocess.Popen.returncode` attribute.
406
407.. method:: SubprocessTransport.kill()
408
409   Kill the subprocess.
410
411   On POSIX systems, the function sends SIGKILL to the subprocess.
412   On Windows, this method is an alias for :meth:`terminate`.
413
414   See also :meth:`subprocess.Popen.kill`.
415
416.. method:: SubprocessTransport.send_signal(signal)
417
418   Send the *signal* number to the subprocess, as in
419   :meth:`subprocess.Popen.send_signal`.
420
421.. method:: SubprocessTransport.terminate()
422
423   Stop the subprocess.
424
425   On POSIX systems, this method sends :py:const:`~signal.SIGTERM` to the subprocess.
426   On Windows, the Windows API function :c:func:`!TerminateProcess` is called to
427   stop the subprocess.
428
429   See also :meth:`subprocess.Popen.terminate`.
430
431.. method:: SubprocessTransport.close()
432
433   Kill the subprocess by calling the :meth:`kill` method.
434
435   If the subprocess hasn't returned yet, and close transports of
436   *stdin*, *stdout*, and *stderr* pipes.
437
438
439.. _asyncio-protocol:
440
441Protocols
442=========
443
444**Source code:** :source:`Lib/asyncio/protocols.py`
445
446---------------------------------------------------
447
448asyncio provides a set of abstract base classes that should be used
449to implement network protocols.  Those classes are meant to be used
450together with :ref:`transports <asyncio-transport>`.
451
452Subclasses of abstract base protocol classes may implement some or
453all methods.  All these methods are callbacks: they are called by
454transports on certain events, for example when some data is received.
455A base protocol method should be called by the corresponding transport.
456
457
458Base Protocols
459--------------
460
461.. class:: BaseProtocol
462
463   Base protocol with methods that all protocols share.
464
465.. class:: Protocol(BaseProtocol)
466
467   The base class for implementing streaming protocols
468   (TCP, Unix sockets, etc).
469
470.. class:: BufferedProtocol(BaseProtocol)
471
472   A base class for implementing streaming protocols with manual
473   control of the receive buffer.
474
475.. class:: DatagramProtocol(BaseProtocol)
476
477   The base class for implementing datagram (UDP) protocols.
478
479.. class:: SubprocessProtocol(BaseProtocol)
480
481   The base class for implementing protocols communicating with child
482   processes (unidirectional pipes).
483
484
485Base Protocol
486-------------
487
488All asyncio protocols can implement Base Protocol callbacks.
489
490.. rubric:: Connection Callbacks
491
492Connection callbacks are called on all protocols, exactly once per
493a successful connection.  All other protocol callbacks can only be
494called between those two methods.
495
496.. method:: BaseProtocol.connection_made(transport)
497
498   Called when a connection is made.
499
500   The *transport* argument is the transport representing the
501   connection.  The protocol is responsible for storing the reference
502   to its transport.
503
504.. method:: BaseProtocol.connection_lost(exc)
505
506   Called when the connection is lost or closed.
507
508   The argument is either an exception object or :const:`None`.
509   The latter means a regular EOF is received, or the connection was
510   aborted or closed by this side of the connection.
511
512
513.. rubric:: Flow Control Callbacks
514
515Flow control callbacks can be called by transports to pause or
516resume writing performed by the protocol.
517
518See the documentation of the :meth:`~WriteTransport.set_write_buffer_limits`
519method for more details.
520
521.. method:: BaseProtocol.pause_writing()
522
523   Called when the transport's buffer goes over the high watermark.
524
525.. method:: BaseProtocol.resume_writing()
526
527   Called when the transport's buffer drains below the low watermark.
528
529If the buffer size equals the high watermark,
530:meth:`~BaseProtocol.pause_writing` is not called: the buffer size must
531go strictly over.
532
533Conversely, :meth:`~BaseProtocol.resume_writing` is called when the
534buffer size is equal or lower than the low watermark.  These end
535conditions are important to ensure that things go as expected when
536either mark is zero.
537
538
539Streaming Protocols
540-------------------
541
542Event methods, such as :meth:`loop.create_server`,
543:meth:`loop.create_unix_server`, :meth:`loop.create_connection`,
544:meth:`loop.create_unix_connection`, :meth:`loop.connect_accepted_socket`,
545:meth:`loop.connect_read_pipe`, and :meth:`loop.connect_write_pipe`
546accept factories that return streaming protocols.
547
548.. method:: Protocol.data_received(data)
549
550   Called when some data is received.  *data* is a non-empty bytes
551   object containing the incoming data.
552
553   Whether the data is buffered, chunked or reassembled depends on
554   the transport.  In general, you shouldn't rely on specific semantics
555   and instead make your parsing generic and flexible. However,
556   data is always received in the correct order.
557
558   The method can be called an arbitrary number of times while
559   a connection is open.
560
561   However, :meth:`protocol.eof_received() <Protocol.eof_received>`
562   is called at most once.  Once ``eof_received()`` is called,
563   ``data_received()`` is not called anymore.
564
565.. method:: Protocol.eof_received()
566
567   Called when the other end signals it won't send any more data
568   (for example by calling :meth:`transport.write_eof()
569   <WriteTransport.write_eof>`, if the other end also uses
570   asyncio).
571
572   This method may return a false value (including ``None``), in which case
573   the transport will close itself.  Conversely, if this method returns a
574   true value, the protocol used determines whether to close the transport.
575   Since the default implementation returns ``None``, it implicitly closes the
576   connection.
577
578   Some transports, including SSL, don't support half-closed connections,
579   in which case returning true from this method will result in the connection
580   being closed.
581
582
583State machine:
584
585.. code-block:: none
586
587    start -> connection_made
588        [-> data_received]*
589        [-> eof_received]?
590    -> connection_lost -> end
591
592
593Buffered Streaming Protocols
594----------------------------
595
596.. versionadded:: 3.7
597
598Buffered Protocols can be used with any event loop method
599that supports `Streaming Protocols`_.
600
601``BufferedProtocol`` implementations allow explicit manual allocation
602and control of the receive buffer.  Event loops can then use the buffer
603provided by the protocol to avoid unnecessary data copies.  This
604can result in noticeable performance improvement for protocols that
605receive big amounts of data.  Sophisticated protocol implementations
606can significantly reduce the number of buffer allocations.
607
608The following callbacks are called on :class:`BufferedProtocol`
609instances:
610
611.. method:: BufferedProtocol.get_buffer(sizehint)
612
613   Called to allocate a new receive buffer.
614
615   *sizehint* is the recommended minimum size for the returned
616   buffer.  It is acceptable to return smaller or larger buffers
617   than what *sizehint* suggests.  When set to -1, the buffer size
618   can be arbitrary. It is an error to return a buffer with a zero size.
619
620   ``get_buffer()`` must return an object implementing the
621   :ref:`buffer protocol <bufferobjects>`.
622
623.. method:: BufferedProtocol.buffer_updated(nbytes)
624
625   Called when the buffer was updated with the received data.
626
627   *nbytes* is the total number of bytes that were written to the buffer.
628
629.. method:: BufferedProtocol.eof_received()
630
631   See the documentation of the :meth:`protocol.eof_received()
632   <Protocol.eof_received>` method.
633
634
635:meth:`~BufferedProtocol.get_buffer` can be called an arbitrary number
636of times during a connection.  However, :meth:`protocol.eof_received()
637<Protocol.eof_received>` is called at most once
638and, if called, :meth:`~BufferedProtocol.get_buffer` and
639:meth:`~BufferedProtocol.buffer_updated` won't be called after it.
640
641State machine:
642
643.. code-block:: none
644
645    start -> connection_made
646        [-> get_buffer
647            [-> buffer_updated]?
648        ]*
649        [-> eof_received]?
650    -> connection_lost -> end
651
652
653Datagram Protocols
654------------------
655
656Datagram Protocol instances should be constructed by protocol
657factories passed to the :meth:`loop.create_datagram_endpoint` method.
658
659.. method:: DatagramProtocol.datagram_received(data, addr)
660
661   Called when a datagram is received.  *data* is a bytes object containing
662   the incoming data.  *addr* is the address of the peer sending the data;
663   the exact format depends on the transport.
664
665.. method:: DatagramProtocol.error_received(exc)
666
667   Called when a previous send or receive operation raises an
668   :class:`OSError`.  *exc* is the :class:`OSError` instance.
669
670   This method is called in rare conditions, when the transport (e.g. UDP)
671   detects that a datagram could not be delivered to its recipient.
672   In many conditions though, undeliverable datagrams will be silently
673   dropped.
674
675.. note::
676
677   On BSD systems (macOS, FreeBSD, etc.) flow control is not supported
678   for datagram protocols, because there is no reliable way to detect send
679   failures caused by writing too many packets.
680
681   The socket always appears 'ready' and excess packets are dropped. An
682   :class:`OSError` with ``errno`` set to :const:`errno.ENOBUFS` may
683   or may not be raised; if it is raised, it will be reported to
684   :meth:`DatagramProtocol.error_received` but otherwise ignored.
685
686
687.. _asyncio-subprocess-protocols:
688
689Subprocess Protocols
690--------------------
691
692Subprocess Protocol instances should be constructed by protocol
693factories passed to the :meth:`loop.subprocess_exec` and
694:meth:`loop.subprocess_shell` methods.
695
696.. method:: SubprocessProtocol.pipe_data_received(fd, data)
697
698   Called when the child process writes data into its stdout or stderr
699   pipe.
700
701   *fd* is the integer file descriptor of the pipe.
702
703   *data* is a non-empty bytes object containing the received data.
704
705.. method:: SubprocessProtocol.pipe_connection_lost(fd, exc)
706
707   Called when one of the pipes communicating with the child process
708   is closed.
709
710   *fd* is the integer file descriptor that was closed.
711
712.. method:: SubprocessProtocol.process_exited()
713
714   Called when the child process has exited.
715
716   It can be called before :meth:`~SubprocessProtocol.pipe_data_received` and
717   :meth:`~SubprocessProtocol.pipe_connection_lost` methods.
718
719
720Examples
721========
722
723.. _asyncio_example_tcp_echo_server_protocol:
724
725TCP Echo Server
726---------------
727
728Create a TCP echo server using the :meth:`loop.create_server` method, send back
729received data, and close the connection::
730
731    import asyncio
732
733
734    class EchoServerProtocol(asyncio.Protocol):
735        def connection_made(self, transport):
736            peername = transport.get_extra_info('peername')
737            print('Connection from {}'.format(peername))
738            self.transport = transport
739
740        def data_received(self, data):
741            message = data.decode()
742            print('Data received: {!r}'.format(message))
743
744            print('Send: {!r}'.format(message))
745            self.transport.write(data)
746
747            print('Close the client socket')
748            self.transport.close()
749
750
751    async def main():
752        # Get a reference to the event loop as we plan to use
753        # low-level APIs.
754        loop = asyncio.get_running_loop()
755
756        server = await loop.create_server(
757            EchoServerProtocol,
758            '127.0.0.1', 8888)
759
760        async with server:
761            await server.serve_forever()
762
763
764    asyncio.run(main())
765
766
767.. seealso::
768
769   The :ref:`TCP echo server using streams <asyncio-tcp-echo-server-streams>`
770   example uses the high-level :func:`asyncio.start_server` function.
771
772.. _asyncio_example_tcp_echo_client_protocol:
773
774TCP Echo Client
775---------------
776
777A TCP echo client using the :meth:`loop.create_connection` method, sends
778data, and waits until the connection is closed::
779
780    import asyncio
781
782
783    class EchoClientProtocol(asyncio.Protocol):
784        def __init__(self, message, on_con_lost):
785            self.message = message
786            self.on_con_lost = on_con_lost
787
788        def connection_made(self, transport):
789            transport.write(self.message.encode())
790            print('Data sent: {!r}'.format(self.message))
791
792        def data_received(self, data):
793            print('Data received: {!r}'.format(data.decode()))
794
795        def connection_lost(self, exc):
796            print('The server closed the connection')
797            self.on_con_lost.set_result(True)
798
799
800    async def main():
801        # Get a reference to the event loop as we plan to use
802        # low-level APIs.
803        loop = asyncio.get_running_loop()
804
805        on_con_lost = loop.create_future()
806        message = 'Hello World!'
807
808        transport, protocol = await loop.create_connection(
809            lambda: EchoClientProtocol(message, on_con_lost),
810            '127.0.0.1', 8888)
811
812        # Wait until the protocol signals that the connection
813        # is lost and close the transport.
814        try:
815            await on_con_lost
816        finally:
817            transport.close()
818
819
820    asyncio.run(main())
821
822
823.. seealso::
824
825   The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>`
826   example uses the high-level :func:`asyncio.open_connection` function.
827
828
829.. _asyncio-udp-echo-server-protocol:
830
831UDP Echo Server
832---------------
833
834A UDP echo server, using the :meth:`loop.create_datagram_endpoint`
835method, sends back received data::
836
837    import asyncio
838
839
840    class EchoServerProtocol:
841        def connection_made(self, transport):
842            self.transport = transport
843
844        def datagram_received(self, data, addr):
845            message = data.decode()
846            print('Received %r from %s' % (message, addr))
847            print('Send %r to %s' % (message, addr))
848            self.transport.sendto(data, addr)
849
850
851    async def main():
852        print("Starting UDP server")
853
854        # Get a reference to the event loop as we plan to use
855        # low-level APIs.
856        loop = asyncio.get_running_loop()
857
858        # One protocol instance will be created to serve all
859        # client requests.
860        transport, protocol = await loop.create_datagram_endpoint(
861            EchoServerProtocol,
862            local_addr=('127.0.0.1', 9999))
863
864        try:
865            await asyncio.sleep(3600)  # Serve for 1 hour.
866        finally:
867            transport.close()
868
869
870    asyncio.run(main())
871
872
873.. _asyncio-udp-echo-client-protocol:
874
875UDP Echo Client
876---------------
877
878A UDP echo client, using the :meth:`loop.create_datagram_endpoint`
879method, sends data and closes the transport when it receives the answer::
880
881    import asyncio
882
883
884    class EchoClientProtocol:
885        def __init__(self, message, on_con_lost):
886            self.message = message
887            self.on_con_lost = on_con_lost
888            self.transport = None
889
890        def connection_made(self, transport):
891            self.transport = transport
892            print('Send:', self.message)
893            self.transport.sendto(self.message.encode())
894
895        def datagram_received(self, data, addr):
896            print("Received:", data.decode())
897
898            print("Close the socket")
899            self.transport.close()
900
901        def error_received(self, exc):
902            print('Error received:', exc)
903
904        def connection_lost(self, exc):
905            print("Connection closed")
906            self.on_con_lost.set_result(True)
907
908
909    async def main():
910        # Get a reference to the event loop as we plan to use
911        # low-level APIs.
912        loop = asyncio.get_running_loop()
913
914        on_con_lost = loop.create_future()
915        message = "Hello World!"
916
917        transport, protocol = await loop.create_datagram_endpoint(
918            lambda: EchoClientProtocol(message, on_con_lost),
919            remote_addr=('127.0.0.1', 9999))
920
921        try:
922            await on_con_lost
923        finally:
924            transport.close()
925
926
927    asyncio.run(main())
928
929
930.. _asyncio_example_create_connection:
931
932Connecting Existing Sockets
933---------------------------
934
935Wait until a socket receives data using the
936:meth:`loop.create_connection` method with a protocol::
937
938    import asyncio
939    import socket
940
941
942    class MyProtocol(asyncio.Protocol):
943
944        def __init__(self, on_con_lost):
945            self.transport = None
946            self.on_con_lost = on_con_lost
947
948        def connection_made(self, transport):
949            self.transport = transport
950
951        def data_received(self, data):
952            print("Received:", data.decode())
953
954            # We are done: close the transport;
955            # connection_lost() will be called automatically.
956            self.transport.close()
957
958        def connection_lost(self, exc):
959            # The socket has been closed
960            self.on_con_lost.set_result(True)
961
962
963    async def main():
964        # Get a reference to the event loop as we plan to use
965        # low-level APIs.
966        loop = asyncio.get_running_loop()
967        on_con_lost = loop.create_future()
968
969        # Create a pair of connected sockets
970        rsock, wsock = socket.socketpair()
971
972        # Register the socket to wait for data.
973        transport, protocol = await loop.create_connection(
974            lambda: MyProtocol(on_con_lost), sock=rsock)
975
976        # Simulate the reception of data from the network.
977        loop.call_soon(wsock.send, 'abc'.encode())
978
979        try:
980            await protocol.on_con_lost
981        finally:
982            transport.close()
983            wsock.close()
984
985    asyncio.run(main())
986
987.. seealso::
988
989   The :ref:`watch a file descriptor for read events
990   <asyncio_example_watch_fd>` example uses the low-level
991   :meth:`loop.add_reader` method to register an FD.
992
993   The :ref:`register an open socket to wait for data using streams
994   <asyncio_example_create_connection-streams>` example uses high-level streams
995   created by the :func:`open_connection` function in a coroutine.
996
997.. _asyncio_example_subprocess_proto:
998
999loop.subprocess_exec() and SubprocessProtocol
1000---------------------------------------------
1001
1002An example of a subprocess protocol used to get the output of a
1003subprocess and to wait for the subprocess exit.
1004
1005The subprocess is created by the :meth:`loop.subprocess_exec` method::
1006
1007    import asyncio
1008    import sys
1009
1010    class DateProtocol(asyncio.SubprocessProtocol):
1011        def __init__(self, exit_future):
1012            self.exit_future = exit_future
1013            self.output = bytearray()
1014            self.pipe_closed = False
1015            self.exited = False
1016
1017        def pipe_connection_lost(self, fd, exc):
1018            self.pipe_closed = True
1019            self.check_for_exit()
1020
1021        def pipe_data_received(self, fd, data):
1022            self.output.extend(data)
1023
1024        def process_exited(self):
1025            self.exited = True
1026            # process_exited() method can be called before
1027            # pipe_connection_lost() method: wait until both methods are
1028            # called.
1029            self.check_for_exit()
1030
1031        def check_for_exit(self):
1032            if self.pipe_closed and self.exited:
1033                self.exit_future.set_result(True)
1034
1035    async def get_date():
1036        # Get a reference to the event loop as we plan to use
1037        # low-level APIs.
1038        loop = asyncio.get_running_loop()
1039
1040        code = 'import datetime; print(datetime.datetime.now())'
1041        exit_future = asyncio.Future(loop=loop)
1042
1043        # Create the subprocess controlled by DateProtocol;
1044        # redirect the standard output into a pipe.
1045        transport, protocol = await loop.subprocess_exec(
1046            lambda: DateProtocol(exit_future),
1047            sys.executable, '-c', code,
1048            stdin=None, stderr=None)
1049
1050        # Wait for the subprocess exit using the process_exited()
1051        # method of the protocol.
1052        await exit_future
1053
1054        # Close the stdout pipe.
1055        transport.close()
1056
1057        # Read the output which was collected by the
1058        # pipe_data_received() method of the protocol.
1059        data = bytes(protocol.output)
1060        return data.decode('ascii').rstrip()
1061
1062    date = asyncio.run(get_date())
1063    print(f"Current date: {date}")
1064
1065See also the :ref:`same example <asyncio_example_create_subprocess_exec>`
1066written using high-level APIs.
1067