• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1.. currentmodule:: asyncio
2
3
4.. _asyncio-transports-protocols:
5
6
7========================
8Transports and Protocols
9========================
10
11.. rubric:: Preface
12
13Transports and Protocols are used by the **low-level** event loop
14APIs such as :meth:`loop.create_connection`.  They use
15callback-based programming style and enable high-performance
16implementations of network or IPC protocols (e.g. HTTP).
17
18Essentially, transports and protocols should only be used in
19libraries and frameworks and never in high-level asyncio
20applications.
21
22This documentation page covers both `Transports`_ and `Protocols`_.
23
24.. rubric:: Introduction
25
26At the highest level, the transport is concerned with *how* bytes
27are transmitted, while the protocol determines *which* bytes to
28transmit (and to some extent when).
29
30A different way of saying the same thing: a transport is an
31abstraction for a socket (or similar I/O endpoint) while a protocol
32is an abstraction for an application, from the transport's point
33of view.
34
35Yet another view is the transport and protocol interfaces
36together define an abstract interface for using network I/O and
37interprocess I/O.
38
39There is always a 1:1 relationship between transport and protocol
40objects: the protocol calls transport methods to send data,
41while the transport calls protocol methods to pass it data that
42has been received.
43
44Most of connection oriented event loop methods
45(such as :meth:`loop.create_connection`) usually accept a
46*protocol_factory* argument used to create a *Protocol* object
47for an accepted connection, represented by a *Transport* object.
48Such methods usually return a tuple of ``(transport, protocol)``.
49
50.. rubric:: Contents
51
52This documentation page contains the following sections:
53
54* The `Transports`_ section documents asyncio :class:`BaseTransport`,
55  :class:`ReadTransport`, :class:`WriteTransport`, :class:`Transport`,
56  :class:`DatagramTransport`, and :class:`SubprocessTransport`
57  classes.
58
59* The `Protocols`_ section documents asyncio :class:`BaseProtocol`,
60  :class:`Protocol`, :class:`BufferedProtocol`,
61  :class:`DatagramProtocol`, and :class:`SubprocessProtocol` classes.
62
63* The `Examples`_ section showcases how to work with transports,
64  protocols, and low-level event loop APIs.
65
66
67.. _asyncio-transport:
68
69Transports
70==========
71
72**Source code:** :source:`Lib/asyncio/transports.py`
73
74----------------------------------------------------
75
76Transports are classes provided by :mod:`asyncio` in order to abstract
77various kinds of communication channels.
78
79Transport objects are always instantiated by an
80:ref:`asyncio event loop <asyncio-event-loop>`.
81
82asyncio implements transports for TCP, UDP, SSL, and subprocess pipes.
83The methods available on a transport depend on the transport's kind.
84
85The transport classes are :ref:`not thread safe <asyncio-multithreading>`.
86
87
88Transports Hierarchy
89--------------------
90
91.. class:: BaseTransport
92
93   Base class for all transports.  Contains methods that all
94   asyncio transports share.
95
96.. class:: WriteTransport(BaseTransport)
97
98   A base transport for write-only connections.
99
100   Instances of the *WriteTransport* class are returned from
101   the :meth:`loop.connect_write_pipe` event loop method and
102   are also used by subprocess-related methods like
103   :meth:`loop.subprocess_exec`.
104
105.. class:: ReadTransport(BaseTransport)
106
107   A base transport for read-only connections.
108
109   Instances of the *ReadTransport* class are returned from
110   the :meth:`loop.connect_read_pipe` event loop method and
111   are also used by subprocess-related methods like
112   :meth:`loop.subprocess_exec`.
113
114.. class:: Transport(WriteTransport, ReadTransport)
115
116   Interface representing a bidirectional transport, such as a
117   TCP connection.
118
119   The user does not instantiate a transport directly; they call a
120   utility function, passing it a protocol factory and other
121   information necessary to create the transport and protocol.
122
123   Instances of the *Transport* class are returned from or used by
124   event loop methods like :meth:`loop.create_connection`,
125   :meth:`loop.create_unix_connection`,
126   :meth:`loop.create_server`, :meth:`loop.sendfile`, etc.
127
128
129.. class:: DatagramTransport(BaseTransport)
130
131   A transport for datagram (UDP) connections.
132
133   Instances of the *DatagramTransport* class are returned from
134   the :meth:`loop.create_datagram_endpoint` event loop method.
135
136
137.. class:: SubprocessTransport(BaseTransport)
138
139   An abstraction to represent a connection between a parent and its
140   child OS process.
141
142   Instances of the *SubprocessTransport* class are returned from
143   event loop methods :meth:`loop.subprocess_shell` and
144   :meth:`loop.subprocess_exec`.
145
146
147Base Transport
148--------------
149
150.. method:: BaseTransport.close()
151
152   Close the transport.
153
154   If the transport has a buffer for outgoing
155   data, buffered data will be flushed asynchronously.  No more data
156   will be received.  After all buffered data is flushed, the
157   protocol's :meth:`protocol.connection_lost()
158   <BaseProtocol.connection_lost>` method will be called with
159   :const:`None` as its argument.
160
161.. method:: BaseTransport.is_closing()
162
163   Return ``True`` if the transport is closing or is closed.
164
165.. method:: BaseTransport.get_extra_info(name, default=None)
166
167   Return information about the transport or underlying resources
168   it uses.
169
170   *name* is a string representing the piece of transport-specific
171   information to get.
172
173   *default* is the value to return if the information is not
174   available, or if the transport does not support querying it
175   with the given third-party event loop implementation or on the
176   current platform.
177
178   For example, the following code attempts to get the underlying
179   socket object of the transport::
180
181      sock = transport.get_extra_info('socket')
182      if sock is not None:
183          print(sock.getsockopt(...))
184
185   Categories of information that can be queried on some transports:
186
187   * socket:
188
189     - ``'peername'``: the remote address to which the socket is
190       connected, result of :meth:`socket.socket.getpeername`
191       (``None`` on error)
192
193     - ``'socket'``: :class:`socket.socket` instance
194
195     - ``'sockname'``: the socket's own address,
196       result of :meth:`socket.socket.getsockname`
197
198   * SSL socket:
199
200     - ``'compression'``: the compression algorithm being used as a
201       string, or ``None`` if the connection isn't compressed; result
202       of :meth:`ssl.SSLSocket.compression`
203
204     - ``'cipher'``: a three-value tuple containing the name of the
205       cipher being used, the version of the SSL protocol that defines
206       its use, and the number of secret bits being used; result of
207       :meth:`ssl.SSLSocket.cipher`
208
209     - ``'peercert'``: peer certificate; result of
210       :meth:`ssl.SSLSocket.getpeercert`
211
212     - ``'sslcontext'``: :class:`ssl.SSLContext` instance
213
214     - ``'ssl_object'``: :class:`ssl.SSLObject` or
215       :class:`ssl.SSLSocket` instance
216
217   * pipe:
218
219     - ``'pipe'``: pipe object
220
221   * subprocess:
222
223     - ``'subprocess'``: :class:`subprocess.Popen` instance
224
225.. method:: BaseTransport.set_protocol(protocol)
226
227   Set a new protocol.
228
229   Switching protocol should only be done when both
230   protocols are documented to support the switch.
231
232.. method:: BaseTransport.get_protocol()
233
234   Return the current protocol.
235
236
237Read-only Transports
238--------------------
239
240.. method:: ReadTransport.is_reading()
241
242   Return ``True`` if the transport is receiving new data.
243
244   .. versionadded:: 3.7
245
246.. method:: ReadTransport.pause_reading()
247
248   Pause the receiving end of the transport.  No data will be passed to
249   the protocol's :meth:`protocol.data_received() <Protocol.data_received>`
250   method until :meth:`resume_reading` is called.
251
252   .. versionchanged:: 3.7
253      The method is idempotent, i.e. it can be called when the
254      transport is already paused or closed.
255
256.. method:: ReadTransport.resume_reading()
257
258   Resume the receiving end.  The protocol's
259   :meth:`protocol.data_received() <Protocol.data_received>` method
260   will be called once again if some data is available for reading.
261
262   .. versionchanged:: 3.7
263      The method is idempotent, i.e. it can be called when the
264      transport is already reading.
265
266
267Write-only Transports
268---------------------
269
270.. method:: WriteTransport.abort()
271
272   Close the transport immediately, without waiting for pending operations
273   to complete.  Buffered data will be lost.  No more data will be received.
274   The protocol's :meth:`protocol.connection_lost()
275   <BaseProtocol.connection_lost>` method will eventually be
276   called with :const:`None` as its argument.
277
278.. method:: WriteTransport.can_write_eof()
279
280   Return :const:`True` if the transport supports
281   :meth:`~WriteTransport.write_eof`, :const:`False` if not.
282
283.. method:: WriteTransport.get_write_buffer_size()
284
285   Return the current size of the output buffer used by the transport.
286
287.. method:: WriteTransport.get_write_buffer_limits()
288
289   Get the *high* and *low* watermarks for write flow control. Return a
290   tuple ``(low, high)`` where *low* and *high* are positive number of
291   bytes.
292
293   Use :meth:`set_write_buffer_limits` to set the limits.
294
295   .. versionadded:: 3.4.2
296
297.. method:: WriteTransport.set_write_buffer_limits(high=None, low=None)
298
299   Set the *high* and *low* watermarks for write flow control.
300
301   These two values (measured in number of
302   bytes) control when the protocol's
303   :meth:`protocol.pause_writing() <BaseProtocol.pause_writing>`
304   and :meth:`protocol.resume_writing() <BaseProtocol.resume_writing>`
305   methods are called. If specified, the low watermark must be less
306   than or equal to the high watermark.  Neither *high* nor *low*
307   can be negative.
308
309   :meth:`~BaseProtocol.pause_writing` is called when the buffer size
310   becomes greater than or equal to the *high* value. If writing has
311   been paused, :meth:`~BaseProtocol.resume_writing` is called when
312   the buffer size becomes less than or equal to the *low* value.
313
314   The defaults are implementation-specific.  If only the
315   high watermark is given, the low watermark defaults to an
316   implementation-specific value less than or equal to the
317   high watermark.  Setting *high* to zero forces *low* to zero as
318   well, and causes :meth:`~BaseProtocol.pause_writing` to be called
319   whenever the buffer becomes non-empty.  Setting *low* to zero causes
320   :meth:`~BaseProtocol.resume_writing` to be called only once the
321   buffer is empty. Use of zero for either limit is generally
322   sub-optimal as it reduces opportunities for doing I/O and
323   computation concurrently.
324
325   Use :meth:`~WriteTransport.get_write_buffer_limits`
326   to get the limits.
327
328.. method:: WriteTransport.write(data)
329
330   Write some *data* bytes to the transport.
331
332   This method does not block; it buffers the data and arranges for it
333   to be sent out asynchronously.
334
335.. method:: WriteTransport.writelines(list_of_data)
336
337   Write a list (or any iterable) of data bytes to the transport.
338   This is functionally equivalent to calling :meth:`write` on each
339   element yielded by the iterable, but may be implemented more
340   efficiently.
341
342.. method:: WriteTransport.write_eof()
343
344   Close the write end of the transport after flushing all buffered data.
345   Data may still be received.
346
347   This method can raise :exc:`NotImplementedError` if the transport
348   (e.g. SSL) doesn't support half-closed connections.
349
350
351Datagram Transports
352-------------------
353
354.. method:: DatagramTransport.sendto(data, addr=None)
355
356   Send the *data* bytes to the remote peer given by *addr* (a
357   transport-dependent target address).  If *addr* is :const:`None`,
358   the data is sent to the target address given on transport
359   creation.
360
361   This method does not block; it buffers the data and arranges
362   for it to be sent out asynchronously.
363
364.. method:: DatagramTransport.abort()
365
366   Close the transport immediately, without waiting for pending
367   operations to complete.  Buffered data will be lost.
368   No more data will be received.  The protocol's
369   :meth:`protocol.connection_lost() <BaseProtocol.connection_lost>`
370   method will eventually be called with :const:`None` as its argument.
371
372
373.. _asyncio-subprocess-transports:
374
375Subprocess Transports
376---------------------
377
378.. method:: SubprocessTransport.get_pid()
379
380   Return the subprocess process id as an integer.
381
382.. method:: SubprocessTransport.get_pipe_transport(fd)
383
384   Return the transport for the communication pipe corresponding to the
385   integer file descriptor *fd*:
386
387   * ``0``: readable streaming transport of the standard input (*stdin*),
388     or :const:`None` if the subprocess was not created with ``stdin=PIPE``
389   * ``1``: writable streaming transport of the standard output (*stdout*),
390     or :const:`None` if the subprocess was not created with ``stdout=PIPE``
391   * ``2``: writable streaming transport of the standard error (*stderr*),
392     or :const:`None` if the subprocess was not created with ``stderr=PIPE``
393   * other *fd*: :const:`None`
394
395.. method:: SubprocessTransport.get_returncode()
396
397   Return the subprocess return code as an integer or :const:`None`
398   if it hasn't returned, which is similar to the
399   :attr:`subprocess.Popen.returncode` attribute.
400
401.. method:: SubprocessTransport.kill()
402
403   Kill the subprocess.
404
405   On POSIX systems, the function sends SIGKILL to the subprocess.
406   On Windows, this method is an alias for :meth:`terminate`.
407
408   See also :meth:`subprocess.Popen.kill`.
409
410.. method:: SubprocessTransport.send_signal(signal)
411
412   Send the *signal* number to the subprocess, as in
413   :meth:`subprocess.Popen.send_signal`.
414
415.. method:: SubprocessTransport.terminate()
416
417   Stop the subprocess.
418
419   On POSIX systems, this method sends SIGTERM to the subprocess.
420   On Windows, the Windows API function TerminateProcess() is called to
421   stop the subprocess.
422
423   See also :meth:`subprocess.Popen.terminate`.
424
425.. method:: SubprocessTransport.close()
426
427   Kill the subprocess by calling the :meth:`kill` method.
428
429   If the subprocess hasn't returned yet, and close transports of
430   *stdin*, *stdout*, and *stderr* pipes.
431
432
433.. _asyncio-protocol:
434
435Protocols
436=========
437
438**Source code:** :source:`Lib/asyncio/protocols.py`
439
440---------------------------------------------------
441
442asyncio provides a set of abstract base classes that should be used
443to implement network protocols.  Those classes are meant to be used
444together with :ref:`transports <asyncio-transport>`.
445
446Subclasses of abstract base protocol classes may implement some or
447all methods.  All these methods are callbacks: they are called by
448transports on certain events, for example when some data is received.
449A base protocol method should be called by the corresponding transport.
450
451
452Base Protocols
453--------------
454
455.. class:: BaseProtocol
456
457   Base protocol with methods that all protocols share.
458
459.. class:: Protocol(BaseProtocol)
460
461   The base class for implementing streaming protocols
462   (TCP, Unix sockets, etc).
463
464.. class:: BufferedProtocol(BaseProtocol)
465
466   A base class for implementing streaming protocols with manual
467   control of the receive buffer.
468
469.. class:: DatagramProtocol(BaseProtocol)
470
471   The base class for implementing datagram (UDP) protocols.
472
473.. class:: SubprocessProtocol(BaseProtocol)
474
475   The base class for implementing protocols communicating with child
476   processes (unidirectional pipes).
477
478
479Base Protocol
480-------------
481
482All asyncio protocols can implement Base Protocol callbacks.
483
484.. rubric:: Connection Callbacks
485
486Connection callbacks are called on all protocols, exactly once per
487a successful connection.  All other protocol callbacks can only be
488called between those two methods.
489
490.. method:: BaseProtocol.connection_made(transport)
491
492   Called when a connection is made.
493
494   The *transport* argument is the transport representing the
495   connection.  The protocol is responsible for storing the reference
496   to its transport.
497
498.. method:: BaseProtocol.connection_lost(exc)
499
500   Called when the connection is lost or closed.
501
502   The argument is either an exception object or :const:`None`.
503   The latter means a regular EOF is received, or the connection was
504   aborted or closed by this side of the connection.
505
506
507.. rubric:: Flow Control Callbacks
508
509Flow control callbacks can be called by transports to pause or
510resume writing performed by the protocol.
511
512See the documentation of the :meth:`~WriteTransport.set_write_buffer_limits`
513method for more details.
514
515.. method:: BaseProtocol.pause_writing()
516
517   Called when the transport's buffer goes over the high watermark.
518
519.. method:: BaseProtocol.resume_writing()
520
521   Called when the transport's buffer drains below the low watermark.
522
523If the buffer size equals the high watermark,
524:meth:`~BaseProtocol.pause_writing` is not called: the buffer size must
525go strictly over.
526
527Conversely, :meth:`~BaseProtocol.resume_writing` is called when the
528buffer size is equal or lower than the low watermark.  These end
529conditions are important to ensure that things go as expected when
530either mark is zero.
531
532
533Streaming Protocols
534-------------------
535
536Event methods, such as :meth:`loop.create_server`,
537:meth:`loop.create_unix_server`, :meth:`loop.create_connection`,
538:meth:`loop.create_unix_connection`, :meth:`loop.connect_accepted_socket`,
539:meth:`loop.connect_read_pipe`, and :meth:`loop.connect_write_pipe`
540accept factories that return streaming protocols.
541
542.. method:: Protocol.data_received(data)
543
544   Called when some data is received.  *data* is a non-empty bytes
545   object containing the incoming data.
546
547   Whether the data is buffered, chunked or reassembled depends on
548   the transport.  In general, you shouldn't rely on specific semantics
549   and instead make your parsing generic and flexible. However,
550   data is always received in the correct order.
551
552   The method can be called an arbitrary number of times while
553   a connection is open.
554
555   However, :meth:`protocol.eof_received() <Protocol.eof_received>`
556   is called at most once.  Once `eof_received()` is called,
557   ``data_received()`` is not called anymore.
558
559.. method:: Protocol.eof_received()
560
561   Called when the other end signals it won't send any more data
562   (for example by calling :meth:`transport.write_eof()
563   <WriteTransport.write_eof>`, if the other end also uses
564   asyncio).
565
566   This method may return a false value (including ``None``), in which case
567   the transport will close itself.  Conversely, if this method returns a
568   true value, the protocol used determines whether to close the transport.
569   Since the default implementation returns ``None``, it implicitly closes the
570   connection.
571
572   Some transports, including SSL, don't support half-closed connections,
573   in which case returning true from this method will result in the connection
574   being closed.
575
576
577State machine:
578
579.. code-block:: none
580
581    start -> connection_made
582        [-> data_received]*
583        [-> eof_received]?
584    -> connection_lost -> end
585
586
587Buffered Streaming Protocols
588----------------------------
589
590.. versionadded:: 3.7
591
592Buffered Protocols can be used with any event loop method
593that supports `Streaming Protocols`_.
594
595``BufferedProtocol`` implementations allow explicit manual allocation
596and control of the receive buffer.  Event loops can then use the buffer
597provided by the protocol to avoid unnecessary data copies.  This
598can result in noticeable performance improvement for protocols that
599receive big amounts of data.  Sophisticated protocol implementations
600can significantly reduce the number of buffer allocations.
601
602The following callbacks are called on :class:`BufferedProtocol`
603instances:
604
605.. method:: BufferedProtocol.get_buffer(sizehint)
606
607   Called to allocate a new receive buffer.
608
609   *sizehint* is the recommended minimum size for the returned
610   buffer.  It is acceptable to return smaller or larger buffers
611   than what *sizehint* suggests.  When set to -1, the buffer size
612   can be arbitrary. It is an error to return a buffer with a zero size.
613
614   ``get_buffer()`` must return an object implementing the
615   :ref:`buffer protocol <bufferobjects>`.
616
617.. method:: BufferedProtocol.buffer_updated(nbytes)
618
619   Called when the buffer was updated with the received data.
620
621   *nbytes* is the total number of bytes that were written to the buffer.
622
623.. method:: BufferedProtocol.eof_received()
624
625   See the documentation of the :meth:`protocol.eof_received()
626   <Protocol.eof_received>` method.
627
628
629:meth:`~BufferedProtocol.get_buffer` can be called an arbitrary number
630of times during a connection.  However, :meth:`protocol.eof_received()
631<Protocol.eof_received>` is called at most once
632and, if called, :meth:`~BufferedProtocol.get_buffer` and
633:meth:`~BufferedProtocol.buffer_updated` won't be called after it.
634
635State machine:
636
637.. code-block:: none
638
639    start -> connection_made
640        [-> get_buffer
641            [-> buffer_updated]?
642        ]*
643        [-> eof_received]?
644    -> connection_lost -> end
645
646
647Datagram Protocols
648------------------
649
650Datagram Protocol instances should be constructed by protocol
651factories passed to the :meth:`loop.create_datagram_endpoint` method.
652
653.. method:: DatagramProtocol.datagram_received(data, addr)
654
655   Called when a datagram is received.  *data* is a bytes object containing
656   the incoming data.  *addr* is the address of the peer sending the data;
657   the exact format depends on the transport.
658
659.. method:: DatagramProtocol.error_received(exc)
660
661   Called when a previous send or receive operation raises an
662   :class:`OSError`.  *exc* is the :class:`OSError` instance.
663
664   This method is called in rare conditions, when the transport (e.g. UDP)
665   detects that a datagram could not be delivered to its recipient.
666   In many conditions though, undeliverable datagrams will be silently
667   dropped.
668
669.. note::
670
671   On BSD systems (macOS, FreeBSD, etc.) flow control is not supported
672   for datagram protocols, because there is no reliable way to detect send
673   failures caused by writing too many packets.
674
675   The socket always appears 'ready' and excess packets are dropped. An
676   :class:`OSError` with ``errno`` set to :const:`errno.ENOBUFS` may
677   or may not be raised; if it is raised, it will be reported to
678   :meth:`DatagramProtocol.error_received` but otherwise ignored.
679
680
681.. _asyncio-subprocess-protocols:
682
683Subprocess Protocols
684--------------------
685
686Datagram Protocol instances should be constructed by protocol
687factories passed to the :meth:`loop.subprocess_exec` and
688:meth:`loop.subprocess_shell` methods.
689
690.. method:: SubprocessProtocol.pipe_data_received(fd, data)
691
692   Called when the child process writes data into its stdout or stderr
693   pipe.
694
695   *fd* is the integer file descriptor of the pipe.
696
697   *data* is a non-empty bytes object containing the received data.
698
699.. method:: SubprocessProtocol.pipe_connection_lost(fd, exc)
700
701   Called when one of the pipes communicating with the child process
702   is closed.
703
704   *fd* is the integer file descriptor that was closed.
705
706.. method:: SubprocessProtocol.process_exited()
707
708   Called when the child process has exited.
709
710
711Examples
712========
713
714.. _asyncio_example_tcp_echo_server_protocol:
715
716TCP Echo Server
717---------------
718
719Create a TCP echo server using the :meth:`loop.create_server` method, send back
720received data, and close the connection::
721
722    import asyncio
723
724
725    class EchoServerProtocol(asyncio.Protocol):
726        def connection_made(self, transport):
727            peername = transport.get_extra_info('peername')
728            print('Connection from {}'.format(peername))
729            self.transport = transport
730
731        def data_received(self, data):
732            message = data.decode()
733            print('Data received: {!r}'.format(message))
734
735            print('Send: {!r}'.format(message))
736            self.transport.write(data)
737
738            print('Close the client socket')
739            self.transport.close()
740
741
742    async def main():
743        # Get a reference to the event loop as we plan to use
744        # low-level APIs.
745        loop = asyncio.get_running_loop()
746
747        server = await loop.create_server(
748            lambda: EchoServerProtocol(),
749            '127.0.0.1', 8888)
750
751        async with server:
752            await server.serve_forever()
753
754
755    asyncio.run(main())
756
757
758.. seealso::
759
760   The :ref:`TCP echo server using streams <asyncio-tcp-echo-server-streams>`
761   example uses the high-level :func:`asyncio.start_server` function.
762
763.. _asyncio_example_tcp_echo_client_protocol:
764
765TCP Echo Client
766---------------
767
768A TCP echo client using the :meth:`loop.create_connection` method, sends
769data, and waits until the connection is closed::
770
771    import asyncio
772
773
774    class EchoClientProtocol(asyncio.Protocol):
775        def __init__(self, message, on_con_lost):
776            self.message = message
777            self.on_con_lost = on_con_lost
778
779        def connection_made(self, transport):
780            transport.write(self.message.encode())
781            print('Data sent: {!r}'.format(self.message))
782
783        def data_received(self, data):
784            print('Data received: {!r}'.format(data.decode()))
785
786        def connection_lost(self, exc):
787            print('The server closed the connection')
788            self.on_con_lost.set_result(True)
789
790
791    async def main():
792        # Get a reference to the event loop as we plan to use
793        # low-level APIs.
794        loop = asyncio.get_running_loop()
795
796        on_con_lost = loop.create_future()
797        message = 'Hello World!'
798
799        transport, protocol = await loop.create_connection(
800            lambda: EchoClientProtocol(message, on_con_lost),
801            '127.0.0.1', 8888)
802
803        # Wait until the protocol signals that the connection
804        # is lost and close the transport.
805        try:
806            await on_con_lost
807        finally:
808            transport.close()
809
810
811    asyncio.run(main())
812
813
814.. seealso::
815
816   The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>`
817   example uses the high-level :func:`asyncio.open_connection` function.
818
819
820.. _asyncio-udp-echo-server-protocol:
821
822UDP Echo Server
823---------------
824
825A UDP echo server, using the :meth:`loop.create_datagram_endpoint`
826method, sends back received data::
827
828    import asyncio
829
830
831    class EchoServerProtocol:
832        def connection_made(self, transport):
833            self.transport = transport
834
835        def datagram_received(self, data, addr):
836            message = data.decode()
837            print('Received %r from %s' % (message, addr))
838            print('Send %r to %s' % (message, addr))
839            self.transport.sendto(data, addr)
840
841
842    async def main():
843        print("Starting UDP server")
844
845        # Get a reference to the event loop as we plan to use
846        # low-level APIs.
847        loop = asyncio.get_running_loop()
848
849        # One protocol instance will be created to serve all
850        # client requests.
851        transport, protocol = await loop.create_datagram_endpoint(
852            lambda: EchoServerProtocol(),
853            local_addr=('127.0.0.1', 9999))
854
855        try:
856            await asyncio.sleep(3600)  # Serve for 1 hour.
857        finally:
858            transport.close()
859
860
861    asyncio.run(main())
862
863
864.. _asyncio-udp-echo-client-protocol:
865
866UDP Echo Client
867---------------
868
869A UDP echo client, using the :meth:`loop.create_datagram_endpoint`
870method, sends data and closes the transport when it receives the answer::
871
872    import asyncio
873
874
875    class EchoClientProtocol:
876        def __init__(self, message, on_con_lost):
877            self.message = message
878            self.on_con_lost = on_con_lost
879            self.transport = None
880
881        def connection_made(self, transport):
882            self.transport = transport
883            print('Send:', self.message)
884            self.transport.sendto(self.message.encode())
885
886        def datagram_received(self, data, addr):
887            print("Received:", data.decode())
888
889            print("Close the socket")
890            self.transport.close()
891
892        def error_received(self, exc):
893            print('Error received:', exc)
894
895        def connection_lost(self, exc):
896            print("Connection closed")
897            self.on_con_lost.set_result(True)
898
899
900    async def main():
901        # Get a reference to the event loop as we plan to use
902        # low-level APIs.
903        loop = asyncio.get_running_loop()
904
905        on_con_lost = loop.create_future()
906        message = "Hello World!"
907
908        transport, protocol = await loop.create_datagram_endpoint(
909            lambda: EchoClientProtocol(message, on_con_lost),
910            remote_addr=('127.0.0.1', 9999))
911
912        try:
913            await on_con_lost
914        finally:
915            transport.close()
916
917
918    asyncio.run(main())
919
920
921.. _asyncio_example_create_connection:
922
923Connecting Existing Sockets
924---------------------------
925
926Wait until a socket receives data using the
927:meth:`loop.create_connection` method with a protocol::
928
929    import asyncio
930    import socket
931
932
933    class MyProtocol(asyncio.Protocol):
934
935        def __init__(self, on_con_lost):
936            self.transport = None
937            self.on_con_lost = on_con_lost
938
939        def connection_made(self, transport):
940            self.transport = transport
941
942        def data_received(self, data):
943            print("Received:", data.decode())
944
945            # We are done: close the transport;
946            # connection_lost() will be called automatically.
947            self.transport.close()
948
949        def connection_lost(self, exc):
950            # The socket has been closed
951            self.on_con_lost.set_result(True)
952
953
954    async def main():
955        # Get a reference to the event loop as we plan to use
956        # low-level APIs.
957        loop = asyncio.get_running_loop()
958        on_con_lost = loop.create_future()
959
960        # Create a pair of connected sockets
961        rsock, wsock = socket.socketpair()
962
963        # Register the socket to wait for data.
964        transport, protocol = await loop.create_connection(
965            lambda: MyProtocol(on_con_lost), sock=rsock)
966
967        # Simulate the reception of data from the network.
968        loop.call_soon(wsock.send, 'abc'.encode())
969
970        try:
971            await protocol.on_con_lost
972        finally:
973            transport.close()
974            wsock.close()
975
976    asyncio.run(main())
977
978.. seealso::
979
980   The :ref:`watch a file descriptor for read events
981   <asyncio_example_watch_fd>` example uses the low-level
982   :meth:`loop.add_reader` method to register an FD.
983
984   The :ref:`register an open socket to wait for data using streams
985   <asyncio_example_create_connection-streams>` example uses high-level streams
986   created by the :func:`open_connection` function in a coroutine.
987
988.. _asyncio_example_subprocess_proto:
989
990loop.subprocess_exec() and SubprocessProtocol
991---------------------------------------------
992
993An example of a subprocess protocol used to get the output of a
994subprocess and to wait for the subprocess exit.
995
996The subprocess is created by the :meth:`loop.subprocess_exec` method::
997
998    import asyncio
999    import sys
1000
1001    class DateProtocol(asyncio.SubprocessProtocol):
1002        def __init__(self, exit_future):
1003            self.exit_future = exit_future
1004            self.output = bytearray()
1005
1006        def pipe_data_received(self, fd, data):
1007            self.output.extend(data)
1008
1009        def process_exited(self):
1010            self.exit_future.set_result(True)
1011
1012    async def get_date():
1013        # Get a reference to the event loop as we plan to use
1014        # low-level APIs.
1015        loop = asyncio.get_running_loop()
1016
1017        code = 'import datetime; print(datetime.datetime.now())'
1018        exit_future = asyncio.Future(loop=loop)
1019
1020        # Create the subprocess controlled by DateProtocol;
1021        # redirect the standard output into a pipe.
1022        transport, protocol = await loop.subprocess_exec(
1023            lambda: DateProtocol(exit_future),
1024            sys.executable, '-c', code,
1025            stdin=None, stderr=None)
1026
1027        # Wait for the subprocess exit using the process_exited()
1028        # method of the protocol.
1029        await exit_future
1030
1031        # Close the stdout pipe.
1032        transport.close()
1033
1034        # Read the output which was collected by the
1035        # pipe_data_received() method of the protocol.
1036        data = bytes(protocol.output)
1037        return data.decode('ascii').rstrip()
1038
1039    date = asyncio.run(get_date())
1040    print(f"Current date: {date}")
1041
1042See also the :ref:`same example <asyncio_example_create_subprocess_exec>`
1043written using high-level APIs.
1044