1.. currentmodule:: asyncio 2 3 4.. _asyncio-transports-protocols: 5 6 7======================== 8Transports and Protocols 9======================== 10 11.. rubric:: Preface 12 13Transports and Protocols are used by the **low-level** event loop 14APIs such as :meth:`loop.create_connection`. They use 15callback-based programming style and enable high-performance 16implementations of network or IPC protocols (e.g. HTTP). 17 18Essentially, transports and protocols should only be used in 19libraries and frameworks and never in high-level asyncio 20applications. 21 22This documentation page covers both `Transports`_ and `Protocols`_. 23 24.. rubric:: Introduction 25 26At the highest level, the transport is concerned with *how* bytes 27are transmitted, while the protocol determines *which* bytes to 28transmit (and to some extent when). 29 30A different way of saying the same thing: a transport is an 31abstraction for a socket (or similar I/O endpoint) while a protocol 32is an abstraction for an application, from the transport's point 33of view. 34 35Yet another view is the transport and protocol interfaces 36together define an abstract interface for using network I/O and 37interprocess I/O. 38 39There is always a 1:1 relationship between transport and protocol 40objects: the protocol calls transport methods to send data, 41while the transport calls protocol methods to pass it data that 42has been received. 43 44Most of connection oriented event loop methods 45(such as :meth:`loop.create_connection`) usually accept a 46*protocol_factory* argument used to create a *Protocol* object 47for an accepted connection, represented by a *Transport* object. 48Such methods usually return a tuple of ``(transport, protocol)``. 49 50.. rubric:: Contents 51 52This documentation page contains the following sections: 53 54* The `Transports`_ section documents asyncio :class:`BaseTransport`, 55 :class:`ReadTransport`, :class:`WriteTransport`, :class:`Transport`, 56 :class:`DatagramTransport`, and :class:`SubprocessTransport` 57 classes. 58 59* The `Protocols`_ section documents asyncio :class:`BaseProtocol`, 60 :class:`Protocol`, :class:`BufferedProtocol`, 61 :class:`DatagramProtocol`, and :class:`SubprocessProtocol` classes. 62 63* The `Examples`_ section showcases how to work with transports, 64 protocols, and low-level event loop APIs. 65 66 67.. _asyncio-transport: 68 69Transports 70========== 71 72**Source code:** :source:`Lib/asyncio/transports.py` 73 74---------------------------------------------------- 75 76Transports are classes provided by :mod:`asyncio` in order to abstract 77various kinds of communication channels. 78 79Transport objects are always instantiated by an 80:ref:`asyncio event loop <asyncio-event-loop>`. 81 82asyncio implements transports for TCP, UDP, SSL, and subprocess pipes. 83The methods available on a transport depend on the transport's kind. 84 85The transport classes are :ref:`not thread safe <asyncio-multithreading>`. 86 87 88Transports Hierarchy 89-------------------- 90 91.. class:: BaseTransport 92 93 Base class for all transports. Contains methods that all 94 asyncio transports share. 95 96.. class:: WriteTransport(BaseTransport) 97 98 A base transport for write-only connections. 99 100 Instances of the *WriteTransport* class are returned from 101 the :meth:`loop.connect_write_pipe` event loop method and 102 are also used by subprocess-related methods like 103 :meth:`loop.subprocess_exec`. 104 105.. class:: ReadTransport(BaseTransport) 106 107 A base transport for read-only connections. 108 109 Instances of the *ReadTransport* class are returned from 110 the :meth:`loop.connect_read_pipe` event loop method and 111 are also used by subprocess-related methods like 112 :meth:`loop.subprocess_exec`. 113 114.. class:: Transport(WriteTransport, ReadTransport) 115 116 Interface representing a bidirectional transport, such as a 117 TCP connection. 118 119 The user does not instantiate a transport directly; they call a 120 utility function, passing it a protocol factory and other 121 information necessary to create the transport and protocol. 122 123 Instances of the *Transport* class are returned from or used by 124 event loop methods like :meth:`loop.create_connection`, 125 :meth:`loop.create_unix_connection`, 126 :meth:`loop.create_server`, :meth:`loop.sendfile`, etc. 127 128 129.. class:: DatagramTransport(BaseTransport) 130 131 A transport for datagram (UDP) connections. 132 133 Instances of the *DatagramTransport* class are returned from 134 the :meth:`loop.create_datagram_endpoint` event loop method. 135 136 137.. class:: SubprocessTransport(BaseTransport) 138 139 An abstraction to represent a connection between a parent and its 140 child OS process. 141 142 Instances of the *SubprocessTransport* class are returned from 143 event loop methods :meth:`loop.subprocess_shell` and 144 :meth:`loop.subprocess_exec`. 145 146 147Base Transport 148-------------- 149 150.. method:: BaseTransport.close() 151 152 Close the transport. 153 154 If the transport has a buffer for outgoing 155 data, buffered data will be flushed asynchronously. No more data 156 will be received. After all buffered data is flushed, the 157 protocol's :meth:`protocol.connection_lost() 158 <BaseProtocol.connection_lost>` method will be called with 159 :const:`None` as its argument. 160 161.. method:: BaseTransport.is_closing() 162 163 Return ``True`` if the transport is closing or is closed. 164 165.. method:: BaseTransport.get_extra_info(name, default=None) 166 167 Return information about the transport or underlying resources 168 it uses. 169 170 *name* is a string representing the piece of transport-specific 171 information to get. 172 173 *default* is the value to return if the information is not 174 available, or if the transport does not support querying it 175 with the given third-party event loop implementation or on the 176 current platform. 177 178 For example, the following code attempts to get the underlying 179 socket object of the transport:: 180 181 sock = transport.get_extra_info('socket') 182 if sock is not None: 183 print(sock.getsockopt(...)) 184 185 Categories of information that can be queried on some transports: 186 187 * socket: 188 189 - ``'peername'``: the remote address to which the socket is 190 connected, result of :meth:`socket.socket.getpeername` 191 (``None`` on error) 192 193 - ``'socket'``: :class:`socket.socket` instance 194 195 - ``'sockname'``: the socket's own address, 196 result of :meth:`socket.socket.getsockname` 197 198 * SSL socket: 199 200 - ``'compression'``: the compression algorithm being used as a 201 string, or ``None`` if the connection isn't compressed; result 202 of :meth:`ssl.SSLSocket.compression` 203 204 - ``'cipher'``: a three-value tuple containing the name of the 205 cipher being used, the version of the SSL protocol that defines 206 its use, and the number of secret bits being used; result of 207 :meth:`ssl.SSLSocket.cipher` 208 209 - ``'peercert'``: peer certificate; result of 210 :meth:`ssl.SSLSocket.getpeercert` 211 212 - ``'sslcontext'``: :class:`ssl.SSLContext` instance 213 214 - ``'ssl_object'``: :class:`ssl.SSLObject` or 215 :class:`ssl.SSLSocket` instance 216 217 * pipe: 218 219 - ``'pipe'``: pipe object 220 221 * subprocess: 222 223 - ``'subprocess'``: :class:`subprocess.Popen` instance 224 225.. method:: BaseTransport.set_protocol(protocol) 226 227 Set a new protocol. 228 229 Switching protocol should only be done when both 230 protocols are documented to support the switch. 231 232.. method:: BaseTransport.get_protocol() 233 234 Return the current protocol. 235 236 237Read-only Transports 238-------------------- 239 240.. method:: ReadTransport.is_reading() 241 242 Return ``True`` if the transport is receiving new data. 243 244 .. versionadded:: 3.7 245 246.. method:: ReadTransport.pause_reading() 247 248 Pause the receiving end of the transport. No data will be passed to 249 the protocol's :meth:`protocol.data_received() <Protocol.data_received>` 250 method until :meth:`resume_reading` is called. 251 252 .. versionchanged:: 3.7 253 The method is idempotent, i.e. it can be called when the 254 transport is already paused or closed. 255 256.. method:: ReadTransport.resume_reading() 257 258 Resume the receiving end. The protocol's 259 :meth:`protocol.data_received() <Protocol.data_received>` method 260 will be called once again if some data is available for reading. 261 262 .. versionchanged:: 3.7 263 The method is idempotent, i.e. it can be called when the 264 transport is already reading. 265 266 267Write-only Transports 268--------------------- 269 270.. method:: WriteTransport.abort() 271 272 Close the transport immediately, without waiting for pending operations 273 to complete. Buffered data will be lost. No more data will be received. 274 The protocol's :meth:`protocol.connection_lost() 275 <BaseProtocol.connection_lost>` method will eventually be 276 called with :const:`None` as its argument. 277 278.. method:: WriteTransport.can_write_eof() 279 280 Return :const:`True` if the transport supports 281 :meth:`~WriteTransport.write_eof`, :const:`False` if not. 282 283.. method:: WriteTransport.get_write_buffer_size() 284 285 Return the current size of the output buffer used by the transport. 286 287.. method:: WriteTransport.get_write_buffer_limits() 288 289 Get the *high* and *low* watermarks for write flow control. Return a 290 tuple ``(low, high)`` where *low* and *high* are positive number of 291 bytes. 292 293 Use :meth:`set_write_buffer_limits` to set the limits. 294 295 .. versionadded:: 3.4.2 296 297.. method:: WriteTransport.set_write_buffer_limits(high=None, low=None) 298 299 Set the *high* and *low* watermarks for write flow control. 300 301 These two values (measured in number of 302 bytes) control when the protocol's 303 :meth:`protocol.pause_writing() <BaseProtocol.pause_writing>` 304 and :meth:`protocol.resume_writing() <BaseProtocol.resume_writing>` 305 methods are called. If specified, the low watermark must be less 306 than or equal to the high watermark. Neither *high* nor *low* 307 can be negative. 308 309 :meth:`~BaseProtocol.pause_writing` is called when the buffer size 310 becomes greater than or equal to the *high* value. If writing has 311 been paused, :meth:`~BaseProtocol.resume_writing` is called when 312 the buffer size becomes less than or equal to the *low* value. 313 314 The defaults are implementation-specific. If only the 315 high watermark is given, the low watermark defaults to an 316 implementation-specific value less than or equal to the 317 high watermark. Setting *high* to zero forces *low* to zero as 318 well, and causes :meth:`~BaseProtocol.pause_writing` to be called 319 whenever the buffer becomes non-empty. Setting *low* to zero causes 320 :meth:`~BaseProtocol.resume_writing` to be called only once the 321 buffer is empty. Use of zero for either limit is generally 322 sub-optimal as it reduces opportunities for doing I/O and 323 computation concurrently. 324 325 Use :meth:`~WriteTransport.get_write_buffer_limits` 326 to get the limits. 327 328.. method:: WriteTransport.write(data) 329 330 Write some *data* bytes to the transport. 331 332 This method does not block; it buffers the data and arranges for it 333 to be sent out asynchronously. 334 335.. method:: WriteTransport.writelines(list_of_data) 336 337 Write a list (or any iterable) of data bytes to the transport. 338 This is functionally equivalent to calling :meth:`write` on each 339 element yielded by the iterable, but may be implemented more 340 efficiently. 341 342.. method:: WriteTransport.write_eof() 343 344 Close the write end of the transport after flushing all buffered data. 345 Data may still be received. 346 347 This method can raise :exc:`NotImplementedError` if the transport 348 (e.g. SSL) doesn't support half-closed connections. 349 350 351Datagram Transports 352------------------- 353 354.. method:: DatagramTransport.sendto(data, addr=None) 355 356 Send the *data* bytes to the remote peer given by *addr* (a 357 transport-dependent target address). If *addr* is :const:`None`, 358 the data is sent to the target address given on transport 359 creation. 360 361 This method does not block; it buffers the data and arranges 362 for it to be sent out asynchronously. 363 364.. method:: DatagramTransport.abort() 365 366 Close the transport immediately, without waiting for pending 367 operations to complete. Buffered data will be lost. 368 No more data will be received. The protocol's 369 :meth:`protocol.connection_lost() <BaseProtocol.connection_lost>` 370 method will eventually be called with :const:`None` as its argument. 371 372 373.. _asyncio-subprocess-transports: 374 375Subprocess Transports 376--------------------- 377 378.. method:: SubprocessTransport.get_pid() 379 380 Return the subprocess process id as an integer. 381 382.. method:: SubprocessTransport.get_pipe_transport(fd) 383 384 Return the transport for the communication pipe corresponding to the 385 integer file descriptor *fd*: 386 387 * ``0``: readable streaming transport of the standard input (*stdin*), 388 or :const:`None` if the subprocess was not created with ``stdin=PIPE`` 389 * ``1``: writable streaming transport of the standard output (*stdout*), 390 or :const:`None` if the subprocess was not created with ``stdout=PIPE`` 391 * ``2``: writable streaming transport of the standard error (*stderr*), 392 or :const:`None` if the subprocess was not created with ``stderr=PIPE`` 393 * other *fd*: :const:`None` 394 395.. method:: SubprocessTransport.get_returncode() 396 397 Return the subprocess return code as an integer or :const:`None` 398 if it hasn't returned, which is similar to the 399 :attr:`subprocess.Popen.returncode` attribute. 400 401.. method:: SubprocessTransport.kill() 402 403 Kill the subprocess. 404 405 On POSIX systems, the function sends SIGKILL to the subprocess. 406 On Windows, this method is an alias for :meth:`terminate`. 407 408 See also :meth:`subprocess.Popen.kill`. 409 410.. method:: SubprocessTransport.send_signal(signal) 411 412 Send the *signal* number to the subprocess, as in 413 :meth:`subprocess.Popen.send_signal`. 414 415.. method:: SubprocessTransport.terminate() 416 417 Stop the subprocess. 418 419 On POSIX systems, this method sends SIGTERM to the subprocess. 420 On Windows, the Windows API function TerminateProcess() is called to 421 stop the subprocess. 422 423 See also :meth:`subprocess.Popen.terminate`. 424 425.. method:: SubprocessTransport.close() 426 427 Kill the subprocess by calling the :meth:`kill` method. 428 429 If the subprocess hasn't returned yet, and close transports of 430 *stdin*, *stdout*, and *stderr* pipes. 431 432 433.. _asyncio-protocol: 434 435Protocols 436========= 437 438**Source code:** :source:`Lib/asyncio/protocols.py` 439 440--------------------------------------------------- 441 442asyncio provides a set of abstract base classes that should be used 443to implement network protocols. Those classes are meant to be used 444together with :ref:`transports <asyncio-transport>`. 445 446Subclasses of abstract base protocol classes may implement some or 447all methods. All these methods are callbacks: they are called by 448transports on certain events, for example when some data is received. 449A base protocol method should be called by the corresponding transport. 450 451 452Base Protocols 453-------------- 454 455.. class:: BaseProtocol 456 457 Base protocol with methods that all protocols share. 458 459.. class:: Protocol(BaseProtocol) 460 461 The base class for implementing streaming protocols 462 (TCP, Unix sockets, etc). 463 464.. class:: BufferedProtocol(BaseProtocol) 465 466 A base class for implementing streaming protocols with manual 467 control of the receive buffer. 468 469.. class:: DatagramProtocol(BaseProtocol) 470 471 The base class for implementing datagram (UDP) protocols. 472 473.. class:: SubprocessProtocol(BaseProtocol) 474 475 The base class for implementing protocols communicating with child 476 processes (unidirectional pipes). 477 478 479Base Protocol 480------------- 481 482All asyncio protocols can implement Base Protocol callbacks. 483 484.. rubric:: Connection Callbacks 485 486Connection callbacks are called on all protocols, exactly once per 487a successful connection. All other protocol callbacks can only be 488called between those two methods. 489 490.. method:: BaseProtocol.connection_made(transport) 491 492 Called when a connection is made. 493 494 The *transport* argument is the transport representing the 495 connection. The protocol is responsible for storing the reference 496 to its transport. 497 498.. method:: BaseProtocol.connection_lost(exc) 499 500 Called when the connection is lost or closed. 501 502 The argument is either an exception object or :const:`None`. 503 The latter means a regular EOF is received, or the connection was 504 aborted or closed by this side of the connection. 505 506 507.. rubric:: Flow Control Callbacks 508 509Flow control callbacks can be called by transports to pause or 510resume writing performed by the protocol. 511 512See the documentation of the :meth:`~WriteTransport.set_write_buffer_limits` 513method for more details. 514 515.. method:: BaseProtocol.pause_writing() 516 517 Called when the transport's buffer goes over the high watermark. 518 519.. method:: BaseProtocol.resume_writing() 520 521 Called when the transport's buffer drains below the low watermark. 522 523If the buffer size equals the high watermark, 524:meth:`~BaseProtocol.pause_writing` is not called: the buffer size must 525go strictly over. 526 527Conversely, :meth:`~BaseProtocol.resume_writing` is called when the 528buffer size is equal or lower than the low watermark. These end 529conditions are important to ensure that things go as expected when 530either mark is zero. 531 532 533Streaming Protocols 534------------------- 535 536Event methods, such as :meth:`loop.create_server`, 537:meth:`loop.create_unix_server`, :meth:`loop.create_connection`, 538:meth:`loop.create_unix_connection`, :meth:`loop.connect_accepted_socket`, 539:meth:`loop.connect_read_pipe`, and :meth:`loop.connect_write_pipe` 540accept factories that return streaming protocols. 541 542.. method:: Protocol.data_received(data) 543 544 Called when some data is received. *data* is a non-empty bytes 545 object containing the incoming data. 546 547 Whether the data is buffered, chunked or reassembled depends on 548 the transport. In general, you shouldn't rely on specific semantics 549 and instead make your parsing generic and flexible. However, 550 data is always received in the correct order. 551 552 The method can be called an arbitrary number of times while 553 a connection is open. 554 555 However, :meth:`protocol.eof_received() <Protocol.eof_received>` 556 is called at most once. Once `eof_received()` is called, 557 ``data_received()`` is not called anymore. 558 559.. method:: Protocol.eof_received() 560 561 Called when the other end signals it won't send any more data 562 (for example by calling :meth:`transport.write_eof() 563 <WriteTransport.write_eof>`, if the other end also uses 564 asyncio). 565 566 This method may return a false value (including ``None``), in which case 567 the transport will close itself. Conversely, if this method returns a 568 true value, the protocol used determines whether to close the transport. 569 Since the default implementation returns ``None``, it implicitly closes the 570 connection. 571 572 Some transports, including SSL, don't support half-closed connections, 573 in which case returning true from this method will result in the connection 574 being closed. 575 576 577State machine: 578 579.. code-block:: none 580 581 start -> connection_made 582 [-> data_received]* 583 [-> eof_received]? 584 -> connection_lost -> end 585 586 587Buffered Streaming Protocols 588---------------------------- 589 590.. versionadded:: 3.7 591 592Buffered Protocols can be used with any event loop method 593that supports `Streaming Protocols`_. 594 595``BufferedProtocol`` implementations allow explicit manual allocation 596and control of the receive buffer. Event loops can then use the buffer 597provided by the protocol to avoid unnecessary data copies. This 598can result in noticeable performance improvement for protocols that 599receive big amounts of data. Sophisticated protocol implementations 600can significantly reduce the number of buffer allocations. 601 602The following callbacks are called on :class:`BufferedProtocol` 603instances: 604 605.. method:: BufferedProtocol.get_buffer(sizehint) 606 607 Called to allocate a new receive buffer. 608 609 *sizehint* is the recommended minimum size for the returned 610 buffer. It is acceptable to return smaller or larger buffers 611 than what *sizehint* suggests. When set to -1, the buffer size 612 can be arbitrary. It is an error to return a buffer with a zero size. 613 614 ``get_buffer()`` must return an object implementing the 615 :ref:`buffer protocol <bufferobjects>`. 616 617.. method:: BufferedProtocol.buffer_updated(nbytes) 618 619 Called when the buffer was updated with the received data. 620 621 *nbytes* is the total number of bytes that were written to the buffer. 622 623.. method:: BufferedProtocol.eof_received() 624 625 See the documentation of the :meth:`protocol.eof_received() 626 <Protocol.eof_received>` method. 627 628 629:meth:`~BufferedProtocol.get_buffer` can be called an arbitrary number 630of times during a connection. However, :meth:`protocol.eof_received() 631<Protocol.eof_received>` is called at most once 632and, if called, :meth:`~BufferedProtocol.get_buffer` and 633:meth:`~BufferedProtocol.buffer_updated` won't be called after it. 634 635State machine: 636 637.. code-block:: none 638 639 start -> connection_made 640 [-> get_buffer 641 [-> buffer_updated]? 642 ]* 643 [-> eof_received]? 644 -> connection_lost -> end 645 646 647Datagram Protocols 648------------------ 649 650Datagram Protocol instances should be constructed by protocol 651factories passed to the :meth:`loop.create_datagram_endpoint` method. 652 653.. method:: DatagramProtocol.datagram_received(data, addr) 654 655 Called when a datagram is received. *data* is a bytes object containing 656 the incoming data. *addr* is the address of the peer sending the data; 657 the exact format depends on the transport. 658 659.. method:: DatagramProtocol.error_received(exc) 660 661 Called when a previous send or receive operation raises an 662 :class:`OSError`. *exc* is the :class:`OSError` instance. 663 664 This method is called in rare conditions, when the transport (e.g. UDP) 665 detects that a datagram could not be delivered to its recipient. 666 In many conditions though, undeliverable datagrams will be silently 667 dropped. 668 669.. note:: 670 671 On BSD systems (macOS, FreeBSD, etc.) flow control is not supported 672 for datagram protocols, because there is no reliable way to detect send 673 failures caused by writing too many packets. 674 675 The socket always appears 'ready' and excess packets are dropped. An 676 :class:`OSError` with ``errno`` set to :const:`errno.ENOBUFS` may 677 or may not be raised; if it is raised, it will be reported to 678 :meth:`DatagramProtocol.error_received` but otherwise ignored. 679 680 681.. _asyncio-subprocess-protocols: 682 683Subprocess Protocols 684-------------------- 685 686Datagram Protocol instances should be constructed by protocol 687factories passed to the :meth:`loop.subprocess_exec` and 688:meth:`loop.subprocess_shell` methods. 689 690.. method:: SubprocessProtocol.pipe_data_received(fd, data) 691 692 Called when the child process writes data into its stdout or stderr 693 pipe. 694 695 *fd* is the integer file descriptor of the pipe. 696 697 *data* is a non-empty bytes object containing the received data. 698 699.. method:: SubprocessProtocol.pipe_connection_lost(fd, exc) 700 701 Called when one of the pipes communicating with the child process 702 is closed. 703 704 *fd* is the integer file descriptor that was closed. 705 706.. method:: SubprocessProtocol.process_exited() 707 708 Called when the child process has exited. 709 710 711Examples 712======== 713 714.. _asyncio_example_tcp_echo_server_protocol: 715 716TCP Echo Server 717--------------- 718 719Create a TCP echo server using the :meth:`loop.create_server` method, send back 720received data, and close the connection:: 721 722 import asyncio 723 724 725 class EchoServerProtocol(asyncio.Protocol): 726 def connection_made(self, transport): 727 peername = transport.get_extra_info('peername') 728 print('Connection from {}'.format(peername)) 729 self.transport = transport 730 731 def data_received(self, data): 732 message = data.decode() 733 print('Data received: {!r}'.format(message)) 734 735 print('Send: {!r}'.format(message)) 736 self.transport.write(data) 737 738 print('Close the client socket') 739 self.transport.close() 740 741 742 async def main(): 743 # Get a reference to the event loop as we plan to use 744 # low-level APIs. 745 loop = asyncio.get_running_loop() 746 747 server = await loop.create_server( 748 lambda: EchoServerProtocol(), 749 '127.0.0.1', 8888) 750 751 async with server: 752 await server.serve_forever() 753 754 755 asyncio.run(main()) 756 757 758.. seealso:: 759 760 The :ref:`TCP echo server using streams <asyncio-tcp-echo-server-streams>` 761 example uses the high-level :func:`asyncio.start_server` function. 762 763.. _asyncio_example_tcp_echo_client_protocol: 764 765TCP Echo Client 766--------------- 767 768A TCP echo client using the :meth:`loop.create_connection` method, sends 769data, and waits until the connection is closed:: 770 771 import asyncio 772 773 774 class EchoClientProtocol(asyncio.Protocol): 775 def __init__(self, message, on_con_lost): 776 self.message = message 777 self.on_con_lost = on_con_lost 778 779 def connection_made(self, transport): 780 transport.write(self.message.encode()) 781 print('Data sent: {!r}'.format(self.message)) 782 783 def data_received(self, data): 784 print('Data received: {!r}'.format(data.decode())) 785 786 def connection_lost(self, exc): 787 print('The server closed the connection') 788 self.on_con_lost.set_result(True) 789 790 791 async def main(): 792 # Get a reference to the event loop as we plan to use 793 # low-level APIs. 794 loop = asyncio.get_running_loop() 795 796 on_con_lost = loop.create_future() 797 message = 'Hello World!' 798 799 transport, protocol = await loop.create_connection( 800 lambda: EchoClientProtocol(message, on_con_lost), 801 '127.0.0.1', 8888) 802 803 # Wait until the protocol signals that the connection 804 # is lost and close the transport. 805 try: 806 await on_con_lost 807 finally: 808 transport.close() 809 810 811 asyncio.run(main()) 812 813 814.. seealso:: 815 816 The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>` 817 example uses the high-level :func:`asyncio.open_connection` function. 818 819 820.. _asyncio-udp-echo-server-protocol: 821 822UDP Echo Server 823--------------- 824 825A UDP echo server, using the :meth:`loop.create_datagram_endpoint` 826method, sends back received data:: 827 828 import asyncio 829 830 831 class EchoServerProtocol: 832 def connection_made(self, transport): 833 self.transport = transport 834 835 def datagram_received(self, data, addr): 836 message = data.decode() 837 print('Received %r from %s' % (message, addr)) 838 print('Send %r to %s' % (message, addr)) 839 self.transport.sendto(data, addr) 840 841 842 async def main(): 843 print("Starting UDP server") 844 845 # Get a reference to the event loop as we plan to use 846 # low-level APIs. 847 loop = asyncio.get_running_loop() 848 849 # One protocol instance will be created to serve all 850 # client requests. 851 transport, protocol = await loop.create_datagram_endpoint( 852 lambda: EchoServerProtocol(), 853 local_addr=('127.0.0.1', 9999)) 854 855 try: 856 await asyncio.sleep(3600) # Serve for 1 hour. 857 finally: 858 transport.close() 859 860 861 asyncio.run(main()) 862 863 864.. _asyncio-udp-echo-client-protocol: 865 866UDP Echo Client 867--------------- 868 869A UDP echo client, using the :meth:`loop.create_datagram_endpoint` 870method, sends data and closes the transport when it receives the answer:: 871 872 import asyncio 873 874 875 class EchoClientProtocol: 876 def __init__(self, message, on_con_lost): 877 self.message = message 878 self.on_con_lost = on_con_lost 879 self.transport = None 880 881 def connection_made(self, transport): 882 self.transport = transport 883 print('Send:', self.message) 884 self.transport.sendto(self.message.encode()) 885 886 def datagram_received(self, data, addr): 887 print("Received:", data.decode()) 888 889 print("Close the socket") 890 self.transport.close() 891 892 def error_received(self, exc): 893 print('Error received:', exc) 894 895 def connection_lost(self, exc): 896 print("Connection closed") 897 self.on_con_lost.set_result(True) 898 899 900 async def main(): 901 # Get a reference to the event loop as we plan to use 902 # low-level APIs. 903 loop = asyncio.get_running_loop() 904 905 on_con_lost = loop.create_future() 906 message = "Hello World!" 907 908 transport, protocol = await loop.create_datagram_endpoint( 909 lambda: EchoClientProtocol(message, on_con_lost), 910 remote_addr=('127.0.0.1', 9999)) 911 912 try: 913 await on_con_lost 914 finally: 915 transport.close() 916 917 918 asyncio.run(main()) 919 920 921.. _asyncio_example_create_connection: 922 923Connecting Existing Sockets 924--------------------------- 925 926Wait until a socket receives data using the 927:meth:`loop.create_connection` method with a protocol:: 928 929 import asyncio 930 import socket 931 932 933 class MyProtocol(asyncio.Protocol): 934 935 def __init__(self, on_con_lost): 936 self.transport = None 937 self.on_con_lost = on_con_lost 938 939 def connection_made(self, transport): 940 self.transport = transport 941 942 def data_received(self, data): 943 print("Received:", data.decode()) 944 945 # We are done: close the transport; 946 # connection_lost() will be called automatically. 947 self.transport.close() 948 949 def connection_lost(self, exc): 950 # The socket has been closed 951 self.on_con_lost.set_result(True) 952 953 954 async def main(): 955 # Get a reference to the event loop as we plan to use 956 # low-level APIs. 957 loop = asyncio.get_running_loop() 958 on_con_lost = loop.create_future() 959 960 # Create a pair of connected sockets 961 rsock, wsock = socket.socketpair() 962 963 # Register the socket to wait for data. 964 transport, protocol = await loop.create_connection( 965 lambda: MyProtocol(on_con_lost), sock=rsock) 966 967 # Simulate the reception of data from the network. 968 loop.call_soon(wsock.send, 'abc'.encode()) 969 970 try: 971 await protocol.on_con_lost 972 finally: 973 transport.close() 974 wsock.close() 975 976 asyncio.run(main()) 977 978.. seealso:: 979 980 The :ref:`watch a file descriptor for read events 981 <asyncio_example_watch_fd>` example uses the low-level 982 :meth:`loop.add_reader` method to register an FD. 983 984 The :ref:`register an open socket to wait for data using streams 985 <asyncio_example_create_connection-streams>` example uses high-level streams 986 created by the :func:`open_connection` function in a coroutine. 987 988.. _asyncio_example_subprocess_proto: 989 990loop.subprocess_exec() and SubprocessProtocol 991--------------------------------------------- 992 993An example of a subprocess protocol used to get the output of a 994subprocess and to wait for the subprocess exit. 995 996The subprocess is created by the :meth:`loop.subprocess_exec` method:: 997 998 import asyncio 999 import sys 1000 1001 class DateProtocol(asyncio.SubprocessProtocol): 1002 def __init__(self, exit_future): 1003 self.exit_future = exit_future 1004 self.output = bytearray() 1005 1006 def pipe_data_received(self, fd, data): 1007 self.output.extend(data) 1008 1009 def process_exited(self): 1010 self.exit_future.set_result(True) 1011 1012 async def get_date(): 1013 # Get a reference to the event loop as we plan to use 1014 # low-level APIs. 1015 loop = asyncio.get_running_loop() 1016 1017 code = 'import datetime; print(datetime.datetime.now())' 1018 exit_future = asyncio.Future(loop=loop) 1019 1020 # Create the subprocess controlled by DateProtocol; 1021 # redirect the standard output into a pipe. 1022 transport, protocol = await loop.subprocess_exec( 1023 lambda: DateProtocol(exit_future), 1024 sys.executable, '-c', code, 1025 stdin=None, stderr=None) 1026 1027 # Wait for the subprocess exit using the process_exited() 1028 # method of the protocol. 1029 await exit_future 1030 1031 # Close the stdout pipe. 1032 transport.close() 1033 1034 # Read the output which was collected by the 1035 # pipe_data_received() method of the protocol. 1036 data = bytes(protocol.output) 1037 return data.decode('ascii').rstrip() 1038 1039 date = asyncio.run(get_date()) 1040 print(f"Current date: {date}") 1041 1042See also the :ref:`same example <asyncio_example_create_subprocess_exec>` 1043written using high-level APIs. 1044