• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1.. currentmodule:: asyncio
2
3
4.. _asyncio-transports-protocols:
5
6
7========================
8Transports and Protocols
9========================
10
11.. rubric:: Preface
12
13Transports and Protocols are used by the **low-level** event loop
14APIs such as :meth:`loop.create_connection`.  They use
15callback-based programming style and enable high-performance
16implementations of network or IPC protocols (e.g. HTTP).
17
18Essentially, transports and protocols should only be used in
19libraries and frameworks and never in high-level asyncio
20applications.
21
22This documentation page covers both `Transports`_ and `Protocols`_.
23
24.. rubric:: Introduction
25
26At the highest level, the transport is concerned with *how* bytes
27are transmitted, while the protocol determines *which* bytes to
28transmit (and to some extent when).
29
30A different way of saying the same thing: a transport is an
31abstraction for a socket (or similar I/O endpoint) while a protocol
32is an abstraction for an application, from the transport's point
33of view.
34
35Yet another view is the transport and protocol interfaces
36together define an abstract interface for using network I/O and
37interprocess I/O.
38
39There is always a 1:1 relationship between transport and protocol
40objects: the protocol calls transport methods to send data,
41while the transport calls protocol methods to pass it data that
42has been received.
43
44Most of connection oriented event loop methods
45(such as :meth:`loop.create_connection`) usually accept a
46*protocol_factory* argument used to create a *Protocol* object
47for an accepted connection, represented by a *Transport* object.
48Such methods usually return a tuple of ``(transport, protocol)``.
49
50.. rubric:: Contents
51
52This documentation page contains the following sections:
53
54* The `Transports`_ section documents asyncio :class:`BaseTransport`,
55  :class:`ReadTransport`, :class:`WriteTransport`, :class:`Transport`,
56  :class:`DatagramTransport`, and :class:`SubprocessTransport`
57  classes.
58
59* The `Protocols`_ section documents asyncio :class:`BaseProtocol`,
60  :class:`Protocol`, :class:`BufferedProtocol`,
61  :class:`DatagramProtocol`, and :class:`SubprocessProtocol` classes.
62
63* The `Examples`_ section showcases how to work with transports,
64  protocols, and low-level event loop APIs.
65
66
67.. _asyncio-transport:
68
69Transports
70==========
71
72**Source code:** :source:`Lib/asyncio/transports.py`
73
74----------------------------------------------------
75
76Transports are classes provided by :mod:`asyncio` in order to abstract
77various kinds of communication channels.
78
79Transport objects are always instantiated by an
80:ref:`asyncio event loop <asyncio-event-loop>`.
81
82asyncio implements transports for TCP, UDP, SSL, and subprocess pipes.
83The methods available on a transport depend on the transport's kind.
84
85The transport classes are :ref:`not thread safe <asyncio-multithreading>`.
86
87
88Transports Hierarchy
89--------------------
90
91.. class:: BaseTransport
92
93   Base class for all transports.  Contains methods that all
94   asyncio transports share.
95
96.. class:: WriteTransport(BaseTransport)
97
98   A base transport for write-only connections.
99
100   Instances of the *WriteTransport* class are returned from
101   the :meth:`loop.connect_write_pipe` event loop method and
102   are also used by subprocess-related methods like
103   :meth:`loop.subprocess_exec`.
104
105.. class:: ReadTransport(BaseTransport)
106
107   A base transport for read-only connections.
108
109   Instances of the *ReadTransport* class are returned from
110   the :meth:`loop.connect_read_pipe` event loop method and
111   are also used by subprocess-related methods like
112   :meth:`loop.subprocess_exec`.
113
114.. class:: Transport(WriteTransport, ReadTransport)
115
116   Interface representing a bidirectional transport, such as a
117   TCP connection.
118
119   The user does not instantiate a transport directly; they call a
120   utility function, passing it a protocol factory and other
121   information necessary to create the transport and protocol.
122
123   Instances of the *Transport* class are returned from or used by
124   event loop methods like :meth:`loop.create_connection`,
125   :meth:`loop.create_unix_connection`,
126   :meth:`loop.create_server`, :meth:`loop.sendfile`, etc.
127
128
129.. class:: DatagramTransport(BaseTransport)
130
131   A transport for datagram (UDP) connections.
132
133   Instances of the *DatagramTransport* class are returned from
134   the :meth:`loop.create_datagram_endpoint` event loop method.
135
136
137.. class:: SubprocessTransport(BaseTransport)
138
139   An abstraction to represent a connection between a parent and its
140   child OS process.
141
142   Instances of the *SubprocessTransport* class are returned from
143   event loop methods :meth:`loop.subprocess_shell` and
144   :meth:`loop.subprocess_exec`.
145
146
147Base Transport
148--------------
149
150.. method:: BaseTransport.close()
151
152   Close the transport.
153
154   If the transport has a buffer for outgoing
155   data, buffered data will be flushed asynchronously.  No more data
156   will be received.  After all buffered data is flushed, the
157   protocol's :meth:`protocol.connection_lost()
158   <BaseProtocol.connection_lost>` method will be called with
159   :const:`None` as its argument.
160
161.. method:: BaseTransport.is_closing()
162
163   Return ``True`` if the transport is closing or is closed.
164
165.. method:: BaseTransport.get_extra_info(name, default=None)
166
167   Return information about the transport or underlying resources
168   it uses.
169
170   *name* is a string representing the piece of transport-specific
171   information to get.
172
173   *default* is the value to return if the information is not
174   available, or if the transport does not support querying it
175   with the given third-party event loop implementation or on the
176   current platform.
177
178   For example, the following code attempts to get the underlying
179   socket object of the transport::
180
181      sock = transport.get_extra_info('socket')
182      if sock is not None:
183          print(sock.getsockopt(...))
184
185   Categories of information that can be queried on some transports:
186
187   * socket:
188
189     - ``'peername'``: the remote address to which the socket is
190       connected, result of :meth:`socket.socket.getpeername`
191       (``None`` on error)
192
193     - ``'socket'``: :class:`socket.socket` instance
194
195     - ``'sockname'``: the socket's own address,
196       result of :meth:`socket.socket.getsockname`
197
198   * SSL socket:
199
200     - ``'compression'``: the compression algorithm being used as a
201       string, or ``None`` if the connection isn't compressed; result
202       of :meth:`ssl.SSLSocket.compression`
203
204     - ``'cipher'``: a three-value tuple containing the name of the
205       cipher being used, the version of the SSL protocol that defines
206       its use, and the number of secret bits being used; result of
207       :meth:`ssl.SSLSocket.cipher`
208
209     - ``'peercert'``: peer certificate; result of
210       :meth:`ssl.SSLSocket.getpeercert`
211
212     - ``'sslcontext'``: :class:`ssl.SSLContext` instance
213
214     - ``'ssl_object'``: :class:`ssl.SSLObject` or
215       :class:`ssl.SSLSocket` instance
216
217   * pipe:
218
219     - ``'pipe'``: pipe object
220
221   * subprocess:
222
223     - ``'subprocess'``: :class:`subprocess.Popen` instance
224
225.. method:: BaseTransport.set_protocol(protocol)
226
227   Set a new protocol.
228
229   Switching protocol should only be done when both
230   protocols are documented to support the switch.
231
232.. method:: BaseTransport.get_protocol()
233
234   Return the current protocol.
235
236
237Read-only Transports
238--------------------
239
240.. method:: ReadTransport.is_reading()
241
242   Return ``True`` if the transport is receiving new data.
243
244   .. versionadded:: 3.7
245
246.. method:: ReadTransport.pause_reading()
247
248   Pause the receiving end of the transport.  No data will be passed to
249   the protocol's :meth:`protocol.data_received() <Protocol.data_received>`
250   method until :meth:`resume_reading` is called.
251
252   .. versionchanged:: 3.7
253      The method is idempotent, i.e. it can be called when the
254      transport is already paused or closed.
255
256.. method:: ReadTransport.resume_reading()
257
258   Resume the receiving end.  The protocol's
259   :meth:`protocol.data_received() <Protocol.data_received>` method
260   will be called once again if some data is available for reading.
261
262   .. versionchanged:: 3.7
263      The method is idempotent, i.e. it can be called when the
264      transport is already reading.
265
266
267Write-only Transports
268---------------------
269
270.. method:: WriteTransport.abort()
271
272   Close the transport immediately, without waiting for pending operations
273   to complete.  Buffered data will be lost.  No more data will be received.
274   The protocol's :meth:`protocol.connection_lost()
275   <BaseProtocol.connection_lost>` method will eventually be
276   called with :const:`None` as its argument.
277
278.. method:: WriteTransport.can_write_eof()
279
280   Return :const:`True` if the transport supports
281   :meth:`~WriteTransport.write_eof`, :const:`False` if not.
282
283.. method:: WriteTransport.get_write_buffer_size()
284
285   Return the current size of the output buffer used by the transport.
286
287.. method:: WriteTransport.get_write_buffer_limits()
288
289   Get the *high* and *low* watermarks for write flow control. Return a
290   tuple ``(low, high)`` where *low* and *high* are positive number of
291   bytes.
292
293   Use :meth:`set_write_buffer_limits` to set the limits.
294
295   .. versionadded:: 3.4.2
296
297.. method:: WriteTransport.set_write_buffer_limits(high=None, low=None)
298
299   Set the *high* and *low* watermarks for write flow control.
300
301   These two values (measured in number of
302   bytes) control when the protocol's
303   :meth:`protocol.pause_writing() <BaseProtocol.pause_writing>`
304   and :meth:`protocol.resume_writing() <BaseProtocol.resume_writing>`
305   methods are called. If specified, the low watermark must be less
306   than or equal to the high watermark.  Neither *high* nor *low*
307   can be negative.
308
309   :meth:`~BaseProtocol.pause_writing` is called when the buffer size
310   becomes greater than or equal to the *high* value. If writing has
311   been paused, :meth:`~BaseProtocol.resume_writing` is called when
312   the buffer size becomes less than or equal to the *low* value.
313
314   The defaults are implementation-specific.  If only the
315   high watermark is given, the low watermark defaults to an
316   implementation-specific value less than or equal to the
317   high watermark.  Setting *high* to zero forces *low* to zero as
318   well, and causes :meth:`~BaseProtocol.pause_writing` to be called
319   whenever the buffer becomes non-empty.  Setting *low* to zero causes
320   :meth:`~BaseProtocol.resume_writing` to be called only once the
321   buffer is empty. Use of zero for either limit is generally
322   sub-optimal as it reduces opportunities for doing I/O and
323   computation concurrently.
324
325   Use :meth:`~WriteTransport.get_write_buffer_limits`
326   to get the limits.
327
328.. method:: WriteTransport.write(data)
329
330   Write some *data* bytes to the transport.
331
332   This method does not block; it buffers the data and arranges for it
333   to be sent out asynchronously.
334
335.. method:: WriteTransport.writelines(list_of_data)
336
337   Write a list (or any iterable) of data bytes to the transport.
338   This is functionally equivalent to calling :meth:`write` on each
339   element yielded by the iterable, but may be implemented more
340   efficiently.
341
342.. method:: WriteTransport.write_eof()
343
344   Close the write end of the transport after flushing all buffered data.
345   Data may still be received.
346
347   This method can raise :exc:`NotImplementedError` if the transport
348   (e.g. SSL) doesn't support half-closed connections.
349
350
351Datagram Transports
352-------------------
353
354.. method:: DatagramTransport.sendto(data, addr=None)
355
356   Send the *data* bytes to the remote peer given by *addr* (a
357   transport-dependent target address).  If *addr* is :const:`None`,
358   the data is sent to the target address given on transport
359   creation.
360
361   This method does not block; it buffers the data and arranges
362   for it to be sent out asynchronously.
363
364.. method:: DatagramTransport.abort()
365
366   Close the transport immediately, without waiting for pending
367   operations to complete.  Buffered data will be lost.
368   No more data will be received.  The protocol's
369   :meth:`protocol.connection_lost() <BaseProtocol.connection_lost>`
370   method will eventually be called with :const:`None` as its argument.
371
372
373.. _asyncio-subprocess-transports:
374
375Subprocess Transports
376---------------------
377
378.. method:: SubprocessTransport.get_pid()
379
380   Return the subprocess process id as an integer.
381
382.. method:: SubprocessTransport.get_pipe_transport(fd)
383
384   Return the transport for the communication pipe corresponding to the
385   integer file descriptor *fd*:
386
387   * ``0``: readable streaming transport of the standard input (*stdin*),
388     or :const:`None` if the subprocess was not created with ``stdin=PIPE``
389   * ``1``: writable streaming transport of the standard output (*stdout*),
390     or :const:`None` if the subprocess was not created with ``stdout=PIPE``
391   * ``2``: writable streaming transport of the standard error (*stderr*),
392     or :const:`None` if the subprocess was not created with ``stderr=PIPE``
393   * other *fd*: :const:`None`
394
395.. method:: SubprocessTransport.get_returncode()
396
397   Return the subprocess return code as an integer or :const:`None`
398   if it hasn't returned, which is similar to the
399   :attr:`subprocess.Popen.returncode` attribute.
400
401.. method:: SubprocessTransport.kill()
402
403   Kill the subprocess.
404
405   On POSIX systems, the function sends SIGKILL to the subprocess.
406   On Windows, this method is an alias for :meth:`terminate`.
407
408   See also :meth:`subprocess.Popen.kill`.
409
410.. method:: SubprocessTransport.send_signal(signal)
411
412   Send the *signal* number to the subprocess, as in
413   :meth:`subprocess.Popen.send_signal`.
414
415.. method:: SubprocessTransport.terminate()
416
417   Stop the subprocess.
418
419   On POSIX systems, this method sends SIGTERM to the subprocess.
420   On Windows, the Windows API function TerminateProcess() is called to
421   stop the subprocess.
422
423   See also :meth:`subprocess.Popen.terminate`.
424
425.. method:: SubprocessTransport.close()
426
427   Kill the subprocess by calling the :meth:`kill` method.
428
429   If the subprocess hasn't returned yet, and close transports of
430   *stdin*, *stdout*, and *stderr* pipes.
431
432
433.. _asyncio-protocol:
434
435Protocols
436=========
437
438**Source code:** :source:`Lib/asyncio/protocols.py`
439
440---------------------------------------------------
441
442asyncio provides a set of abstract base classes that should be used
443to implement network protocols.  Those classes are meant to be used
444together with :ref:`transports <asyncio-transport>`.
445
446Subclasses of abstract base protocol classes may implement some or
447all methods.  All these methods are callbacks: they are called by
448transports on certain events, for example when some data is received.
449A base protocol method should be called by the corresponding transport.
450
451
452Base Protocols
453--------------
454
455.. class:: BaseProtocol
456
457   Base protocol with methods that all protocols share.
458
459.. class:: Protocol(BaseProtocol)
460
461   The base class for implementing streaming protocols
462   (TCP, Unix sockets, etc).
463
464.. class:: BufferedProtocol(BaseProtocol)
465
466   A base class for implementing streaming protocols with manual
467   control of the receive buffer.
468
469.. class:: DatagramProtocol(BaseProtocol)
470
471   The base class for implementing datagram (UDP) protocols.
472
473.. class:: SubprocessProtocol(BaseProtocol)
474
475   The base class for implementing protocols communicating with child
476   processes (unidirectional pipes).
477
478
479Base Protocol
480-------------
481
482All asyncio protocols can implement Base Protocol callbacks.
483
484.. rubric:: Connection Callbacks
485
486Connection callbacks are called on all protocols, exactly once per
487a successful connection.  All other protocol callbacks can only be
488called between those two methods.
489
490.. method:: BaseProtocol.connection_made(transport)
491
492   Called when a connection is made.
493
494   The *transport* argument is the transport representing the
495   connection.  The protocol is responsible for storing the reference
496   to its transport.
497
498.. method:: BaseProtocol.connection_lost(exc)
499
500   Called when the connection is lost or closed.
501
502   The argument is either an exception object or :const:`None`.
503   The latter means a regular EOF is received, or the connection was
504   aborted or closed by this side of the connection.
505
506
507.. rubric:: Flow Control Callbacks
508
509Flow control callbacks can be called by transports to pause or
510resume writing performed by the protocol.
511
512See the documentation of the :meth:`~WriteTransport.set_write_buffer_limits`
513method for more details.
514
515.. method:: BaseProtocol.pause_writing()
516
517   Called when the transport's buffer goes over the high watermark.
518
519.. method:: BaseProtocol.resume_writing()
520
521   Called when the transport's buffer drains below the low watermark.
522
523If the buffer size equals the high watermark,
524:meth:`~BaseProtocol.pause_writing` is not called: the buffer size must
525go strictly over.
526
527Conversely, :meth:`~BaseProtocol.resume_writing` is called when the
528buffer size is equal or lower than the low watermark.  These end
529conditions are important to ensure that things go as expected when
530either mark is zero.
531
532
533Streaming Protocols
534-------------------
535
536Event methods, such as :meth:`loop.create_server`,
537:meth:`loop.create_unix_server`, :meth:`loop.create_connection`,
538:meth:`loop.create_unix_connection`, :meth:`loop.connect_accepted_socket`,
539:meth:`loop.connect_read_pipe`, and :meth:`loop.connect_write_pipe`
540accept factories that return streaming protocols.
541
542.. method:: Protocol.data_received(data)
543
544   Called when some data is received.  *data* is a non-empty bytes
545   object containing the incoming data.
546
547   Whether the data is buffered, chunked or reassembled depends on
548   the transport.  In general, you shouldn't rely on specific semantics
549   and instead make your parsing generic and flexible. However,
550   data is always received in the correct order.
551
552   The method can be called an arbitrary number of times while
553   a connection is open.
554
555   However, :meth:`protocol.eof_received() <Protocol.eof_received>`
556   is called at most once.  Once `eof_received()` is called,
557   ``data_received()`` is not called anymore.
558
559.. method:: Protocol.eof_received()
560
561   Called when the other end signals it won't send any more data
562   (for example by calling :meth:`transport.write_eof()
563   <WriteTransport.write_eof>`, if the other end also uses
564   asyncio).
565
566   This method may return a false value (including ``None``), in which case
567   the transport will close itself.  Conversely, if this method returns a
568   true value, the protocol used determines whether to close the transport.
569   Since the default implementation returns ``None``, it implicitly closes the
570   connection.
571
572   Some transports, including SSL, don't support half-closed connections,
573   in which case returning true from this method will result in the connection
574   being closed.
575
576
577State machine:
578
579.. code-block:: none
580
581    start -> connection_made
582        [-> data_received]*
583        [-> eof_received]?
584    -> connection_lost -> end
585
586
587Buffered Streaming Protocols
588----------------------------
589
590.. versionadded:: 3.7
591   **Important:** this has been added to asyncio in Python 3.7
592   *on a provisional basis*!  This is as an experimental API that
593   might be changed or removed completely in Python 3.8.
594
595Buffered Protocols can be used with any event loop method
596that supports `Streaming Protocols`_.
597
598``BufferedProtocol`` implementations allow explicit manual allocation
599and control of the receive buffer.  Event loops can then use the buffer
600provided by the protocol to avoid unnecessary data copies.  This
601can result in noticeable performance improvement for protocols that
602receive big amounts of data.  Sophisticated protocol implementations
603can significantly reduce the number of buffer allocations.
604
605The following callbacks are called on :class:`BufferedProtocol`
606instances:
607
608.. method:: BufferedProtocol.get_buffer(sizehint)
609
610   Called to allocate a new receive buffer.
611
612   *sizehint* is the recommended minimum size for the returned
613   buffer.  It is acceptable to return smaller or larger buffers
614   than what *sizehint* suggests.  When set to -1, the buffer size
615   can be arbitrary. It is an error to return a buffer with a zero size.
616
617   ``get_buffer()`` must return an object implementing the
618   :ref:`buffer protocol <bufferobjects>`.
619
620.. method:: BufferedProtocol.buffer_updated(nbytes)
621
622   Called when the buffer was updated with the received data.
623
624   *nbytes* is the total number of bytes that were written to the buffer.
625
626.. method:: BufferedProtocol.eof_received()
627
628   See the documentation of the :meth:`protocol.eof_received()
629   <Protocol.eof_received>` method.
630
631
632:meth:`~BufferedProtocol.get_buffer` can be called an arbitrary number
633of times during a connection.  However, :meth:`protocol.eof_received()
634<Protocol.eof_received>` is called at most once
635and, if called, :meth:`~BufferedProtocol.get_buffer` and
636:meth:`~BufferedProtocol.buffer_updated` won't be called after it.
637
638State machine:
639
640.. code-block:: none
641
642    start -> connection_made
643        [-> get_buffer
644            [-> buffer_updated]?
645        ]*
646        [-> eof_received]?
647    -> connection_lost -> end
648
649
650Datagram Protocols
651------------------
652
653Datagram Protocol instances should be constructed by protocol
654factories passed to the :meth:`loop.create_datagram_endpoint` method.
655
656.. method:: DatagramProtocol.datagram_received(data, addr)
657
658   Called when a datagram is received.  *data* is a bytes object containing
659   the incoming data.  *addr* is the address of the peer sending the data;
660   the exact format depends on the transport.
661
662.. method:: DatagramProtocol.error_received(exc)
663
664   Called when a previous send or receive operation raises an
665   :class:`OSError`.  *exc* is the :class:`OSError` instance.
666
667   This method is called in rare conditions, when the transport (e.g. UDP)
668   detects that a datagram could not be delivered to its recipient.
669   In many conditions though, undeliverable datagrams will be silently
670   dropped.
671
672.. note::
673
674   On BSD systems (macOS, FreeBSD, etc.) flow control is not supported
675   for datagram protocols, because there is no reliable way to detect send
676   failures caused by writing too many packets.
677
678   The socket always appears 'ready' and excess packets are dropped. An
679   :class:`OSError` with ``errno`` set to :const:`errno.ENOBUFS` may
680   or may not be raised; if it is raised, it will be reported to
681   :meth:`DatagramProtocol.error_received` but otherwise ignored.
682
683
684.. _asyncio-subprocess-protocols:
685
686Subprocess Protocols
687--------------------
688
689Datagram Protocol instances should be constructed by protocol
690factories passed to the :meth:`loop.subprocess_exec` and
691:meth:`loop.subprocess_shell` methods.
692
693.. method:: SubprocessProtocol.pipe_data_received(fd, data)
694
695   Called when the child process writes data into its stdout or stderr
696   pipe.
697
698   *fd* is the integer file descriptor of the pipe.
699
700   *data* is a non-empty bytes object containing the received data.
701
702.. method:: SubprocessProtocol.pipe_connection_lost(fd, exc)
703
704   Called when one of the pipes communicating with the child process
705   is closed.
706
707   *fd* is the integer file descriptor that was closed.
708
709.. method:: SubprocessProtocol.process_exited()
710
711   Called when the child process has exited.
712
713
714Examples
715========
716
717.. _asyncio_example_tcp_echo_server_protocol:
718
719TCP Echo Server
720---------------
721
722Create a TCP echo server using the :meth:`loop.create_server` method, send back
723received data, and close the connection::
724
725    import asyncio
726
727
728    class EchoServerProtocol(asyncio.Protocol):
729        def connection_made(self, transport):
730            peername = transport.get_extra_info('peername')
731            print('Connection from {}'.format(peername))
732            self.transport = transport
733
734        def data_received(self, data):
735            message = data.decode()
736            print('Data received: {!r}'.format(message))
737
738            print('Send: {!r}'.format(message))
739            self.transport.write(data)
740
741            print('Close the client socket')
742            self.transport.close()
743
744
745    async def main():
746        # Get a reference to the event loop as we plan to use
747        # low-level APIs.
748        loop = asyncio.get_running_loop()
749
750        server = await loop.create_server(
751            lambda: EchoServerProtocol(),
752            '127.0.0.1', 8888)
753
754        async with server:
755            await server.serve_forever()
756
757
758    asyncio.run(main())
759
760
761.. seealso::
762
763   The :ref:`TCP echo server using streams <asyncio-tcp-echo-server-streams>`
764   example uses the high-level :func:`asyncio.start_server` function.
765
766.. _asyncio_example_tcp_echo_client_protocol:
767
768TCP Echo Client
769---------------
770
771A TCP echo client using the :meth:`loop.create_connection` method, sends
772data, and waits until the connection is closed::
773
774    import asyncio
775
776
777    class EchoClientProtocol(asyncio.Protocol):
778        def __init__(self, message, on_con_lost):
779            self.message = message
780            self.on_con_lost = on_con_lost
781
782        def connection_made(self, transport):
783            transport.write(self.message.encode())
784            print('Data sent: {!r}'.format(self.message))
785
786        def data_received(self, data):
787            print('Data received: {!r}'.format(data.decode()))
788
789        def connection_lost(self, exc):
790            print('The server closed the connection')
791            self.on_con_lost.set_result(True)
792
793
794    async def main():
795        # Get a reference to the event loop as we plan to use
796        # low-level APIs.
797        loop = asyncio.get_running_loop()
798
799        on_con_lost = loop.create_future()
800        message = 'Hello World!'
801
802        transport, protocol = await loop.create_connection(
803            lambda: EchoClientProtocol(message, on_con_lost),
804            '127.0.0.1', 8888)
805
806        # Wait until the protocol signals that the connection
807        # is lost and close the transport.
808        try:
809            await on_con_lost
810        finally:
811            transport.close()
812
813
814    asyncio.run(main())
815
816
817.. seealso::
818
819   The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>`
820   example uses the high-level :func:`asyncio.open_connection` function.
821
822
823.. _asyncio-udp-echo-server-protocol:
824
825UDP Echo Server
826---------------
827
828A UDP echo server, using the :meth:`loop.create_datagram_endpoint`
829method, sends back received data::
830
831    import asyncio
832
833
834    class EchoServerProtocol:
835        def connection_made(self, transport):
836            self.transport = transport
837
838        def datagram_received(self, data, addr):
839            message = data.decode()
840            print('Received %r from %s' % (message, addr))
841            print('Send %r to %s' % (message, addr))
842            self.transport.sendto(data, addr)
843
844
845    async def main():
846        print("Starting UDP server")
847
848        # Get a reference to the event loop as we plan to use
849        # low-level APIs.
850        loop = asyncio.get_running_loop()
851
852        # One protocol instance will be created to serve all
853        # client requests.
854        transport, protocol = await loop.create_datagram_endpoint(
855            lambda: EchoServerProtocol(),
856            local_addr=('127.0.0.1', 9999))
857
858        try:
859            await asyncio.sleep(3600)  # Serve for 1 hour.
860        finally:
861            transport.close()
862
863
864    asyncio.run(main())
865
866
867.. _asyncio-udp-echo-client-protocol:
868
869UDP Echo Client
870---------------
871
872A UDP echo client, using the :meth:`loop.create_datagram_endpoint`
873method, sends data and closes the transport when it receives the answer::
874
875    import asyncio
876
877
878    class EchoClientProtocol:
879        def __init__(self, message, on_con_lost):
880            self.message = message
881            self.on_con_lost = on_con_lost
882            self.transport = None
883
884        def connection_made(self, transport):
885            self.transport = transport
886            print('Send:', self.message)
887            self.transport.sendto(self.message.encode())
888
889        def datagram_received(self, data, addr):
890            print("Received:", data.decode())
891
892            print("Close the socket")
893            self.transport.close()
894
895        def error_received(self, exc):
896            print('Error received:', exc)
897
898        def connection_lost(self, exc):
899            print("Connection closed")
900            self.on_con_lost.set_result(True)
901
902
903    async def main():
904        # Get a reference to the event loop as we plan to use
905        # low-level APIs.
906        loop = asyncio.get_running_loop()
907
908        on_con_lost = loop.create_future()
909        message = "Hello World!"
910
911        transport, protocol = await loop.create_datagram_endpoint(
912            lambda: EchoClientProtocol(message, on_con_lost),
913            remote_addr=('127.0.0.1', 9999))
914
915        try:
916            await on_con_lost
917        finally:
918            transport.close()
919
920
921    asyncio.run(main())
922
923
924.. _asyncio_example_create_connection:
925
926Connecting Existing Sockets
927---------------------------
928
929Wait until a socket receives data using the
930:meth:`loop.create_connection` method with a protocol::
931
932    import asyncio
933    import socket
934
935
936    class MyProtocol(asyncio.Protocol):
937
938        def __init__(self, on_con_lost):
939            self.transport = None
940            self.on_con_lost = on_con_lost
941
942        def connection_made(self, transport):
943            self.transport = transport
944
945        def data_received(self, data):
946            print("Received:", data.decode())
947
948            # We are done: close the transport;
949            # connection_lost() will be called automatically.
950            self.transport.close()
951
952        def connection_lost(self, exc):
953            # The socket has been closed
954            self.on_con_lost.set_result(True)
955
956
957    async def main():
958        # Get a reference to the event loop as we plan to use
959        # low-level APIs.
960        loop = asyncio.get_running_loop()
961        on_con_lost = loop.create_future()
962
963        # Create a pair of connected sockets
964        rsock, wsock = socket.socketpair()
965
966        # Register the socket to wait for data.
967        transport, protocol = await loop.create_connection(
968            lambda: MyProtocol(on_con_lost), sock=rsock)
969
970        # Simulate the reception of data from the network.
971        loop.call_soon(wsock.send, 'abc'.encode())
972
973        try:
974            await protocol.on_con_lost
975        finally:
976            transport.close()
977            wsock.close()
978
979    asyncio.run(main())
980
981.. seealso::
982
983   The :ref:`watch a file descriptor for read events
984   <asyncio_example_watch_fd>` example uses the low-level
985   :meth:`loop.add_reader` method to register an FD.
986
987   The :ref:`register an open socket to wait for data using streams
988   <asyncio_example_create_connection-streams>` example uses high-level streams
989   created by the :func:`open_connection` function in a coroutine.
990
991.. _asyncio_example_subprocess_proto:
992
993loop.subprocess_exec() and SubprocessProtocol
994---------------------------------------------
995
996An example of a subprocess protocol used to get the output of a
997subprocess and to wait for the subprocess exit.
998
999The subprocess is created by the :meth:`loop.subprocess_exec` method::
1000
1001    import asyncio
1002    import sys
1003
1004    class DateProtocol(asyncio.SubprocessProtocol):
1005        def __init__(self, exit_future):
1006            self.exit_future = exit_future
1007            self.output = bytearray()
1008
1009        def pipe_data_received(self, fd, data):
1010            self.output.extend(data)
1011
1012        def process_exited(self):
1013            self.exit_future.set_result(True)
1014
1015    async def get_date():
1016        # Get a reference to the event loop as we plan to use
1017        # low-level APIs.
1018        loop = asyncio.get_running_loop()
1019
1020        code = 'import datetime; print(datetime.datetime.now())'
1021        exit_future = asyncio.Future(loop=loop)
1022
1023        # Create the subprocess controlled by DateProtocol;
1024        # redirect the standard output into a pipe.
1025        transport, protocol = await loop.subprocess_exec(
1026            lambda: DateProtocol(exit_future),
1027            sys.executable, '-c', code,
1028            stdin=None, stderr=None)
1029
1030        # Wait for the subprocess exit using the process_exited()
1031        # method of the protocol.
1032        await exit_future
1033
1034        # Close the stdout pipe.
1035        transport.close()
1036
1037        # Read the output which was collected by the
1038        # pipe_data_received() method of the protocol.
1039        data = bytes(protocol.output)
1040        return data.decode('ascii').rstrip()
1041
1042    date = asyncio.run(get_date())
1043    print(f"Current date: {date}")
1044
1045See also the :ref:`same example <asyncio_example_create_subprocess_exec>`
1046written using high-level APIs.
1047