1.. currentmodule:: asyncio 2 3 4.. _asyncio-transports-protocols: 5 6 7======================== 8Transports and Protocols 9======================== 10 11.. rubric:: Preface 12 13Transports and Protocols are used by the **low-level** event loop 14APIs such as :meth:`loop.create_connection`. They use 15callback-based programming style and enable high-performance 16implementations of network or IPC protocols (e.g. HTTP). 17 18Essentially, transports and protocols should only be used in 19libraries and frameworks and never in high-level asyncio 20applications. 21 22This documentation page covers both `Transports`_ and `Protocols`_. 23 24.. rubric:: Introduction 25 26At the highest level, the transport is concerned with *how* bytes 27are transmitted, while the protocol determines *which* bytes to 28transmit (and to some extent when). 29 30A different way of saying the same thing: a transport is an 31abstraction for a socket (or similar I/O endpoint) while a protocol 32is an abstraction for an application, from the transport's point 33of view. 34 35Yet another view is the transport and protocol interfaces 36together define an abstract interface for using network I/O and 37interprocess I/O. 38 39There is always a 1:1 relationship between transport and protocol 40objects: the protocol calls transport methods to send data, 41while the transport calls protocol methods to pass it data that 42has been received. 43 44Most of connection oriented event loop methods 45(such as :meth:`loop.create_connection`) usually accept a 46*protocol_factory* argument used to create a *Protocol* object 47for an accepted connection, represented by a *Transport* object. 48Such methods usually return a tuple of ``(transport, protocol)``. 49 50.. rubric:: Contents 51 52This documentation page contains the following sections: 53 54* The `Transports`_ section documents asyncio :class:`BaseTransport`, 55 :class:`ReadTransport`, :class:`WriteTransport`, :class:`Transport`, 56 :class:`DatagramTransport`, and :class:`SubprocessTransport` 57 classes. 58 59* The `Protocols`_ section documents asyncio :class:`BaseProtocol`, 60 :class:`Protocol`, :class:`BufferedProtocol`, 61 :class:`DatagramProtocol`, and :class:`SubprocessProtocol` classes. 62 63* The `Examples`_ section showcases how to work with transports, 64 protocols, and low-level event loop APIs. 65 66 67.. _asyncio-transport: 68 69Transports 70========== 71 72**Source code:** :source:`Lib/asyncio/transports.py` 73 74---------------------------------------------------- 75 76Transports are classes provided by :mod:`asyncio` in order to abstract 77various kinds of communication channels. 78 79Transport objects are always instantiated by an 80:ref:`asyncio event loop <asyncio-event-loop>`. 81 82asyncio implements transports for TCP, UDP, SSL, and subprocess pipes. 83The methods available on a transport depend on the transport's kind. 84 85The transport classes are :ref:`not thread safe <asyncio-multithreading>`. 86 87 88Transports Hierarchy 89-------------------- 90 91.. class:: BaseTransport 92 93 Base class for all transports. Contains methods that all 94 asyncio transports share. 95 96.. class:: WriteTransport(BaseTransport) 97 98 A base transport for write-only connections. 99 100 Instances of the *WriteTransport* class are returned from 101 the :meth:`loop.connect_write_pipe` event loop method and 102 are also used by subprocess-related methods like 103 :meth:`loop.subprocess_exec`. 104 105.. class:: ReadTransport(BaseTransport) 106 107 A base transport for read-only connections. 108 109 Instances of the *ReadTransport* class are returned from 110 the :meth:`loop.connect_read_pipe` event loop method and 111 are also used by subprocess-related methods like 112 :meth:`loop.subprocess_exec`. 113 114.. class:: Transport(WriteTransport, ReadTransport) 115 116 Interface representing a bidirectional transport, such as a 117 TCP connection. 118 119 The user does not instantiate a transport directly; they call a 120 utility function, passing it a protocol factory and other 121 information necessary to create the transport and protocol. 122 123 Instances of the *Transport* class are returned from or used by 124 event loop methods like :meth:`loop.create_connection`, 125 :meth:`loop.create_unix_connection`, 126 :meth:`loop.create_server`, :meth:`loop.sendfile`, etc. 127 128 129.. class:: DatagramTransport(BaseTransport) 130 131 A transport for datagram (UDP) connections. 132 133 Instances of the *DatagramTransport* class are returned from 134 the :meth:`loop.create_datagram_endpoint` event loop method. 135 136 137.. class:: SubprocessTransport(BaseTransport) 138 139 An abstraction to represent a connection between a parent and its 140 child OS process. 141 142 Instances of the *SubprocessTransport* class are returned from 143 event loop methods :meth:`loop.subprocess_shell` and 144 :meth:`loop.subprocess_exec`. 145 146 147Base Transport 148-------------- 149 150.. method:: BaseTransport.close() 151 152 Close the transport. 153 154 If the transport has a buffer for outgoing 155 data, buffered data will be flushed asynchronously. No more data 156 will be received. After all buffered data is flushed, the 157 protocol's :meth:`protocol.connection_lost() 158 <BaseProtocol.connection_lost>` method will be called with 159 :const:`None` as its argument. The transport should not be 160 used once it is closed. 161 162.. method:: BaseTransport.is_closing() 163 164 Return ``True`` if the transport is closing or is closed. 165 166.. method:: BaseTransport.get_extra_info(name, default=None) 167 168 Return information about the transport or underlying resources 169 it uses. 170 171 *name* is a string representing the piece of transport-specific 172 information to get. 173 174 *default* is the value to return if the information is not 175 available, or if the transport does not support querying it 176 with the given third-party event loop implementation or on the 177 current platform. 178 179 For example, the following code attempts to get the underlying 180 socket object of the transport:: 181 182 sock = transport.get_extra_info('socket') 183 if sock is not None: 184 print(sock.getsockopt(...)) 185 186 Categories of information that can be queried on some transports: 187 188 * socket: 189 190 - ``'peername'``: the remote address to which the socket is 191 connected, result of :meth:`socket.socket.getpeername` 192 (``None`` on error) 193 194 - ``'socket'``: :class:`socket.socket` instance 195 196 - ``'sockname'``: the socket's own address, 197 result of :meth:`socket.socket.getsockname` 198 199 * SSL socket: 200 201 - ``'compression'``: the compression algorithm being used as a 202 string, or ``None`` if the connection isn't compressed; result 203 of :meth:`ssl.SSLSocket.compression` 204 205 - ``'cipher'``: a three-value tuple containing the name of the 206 cipher being used, the version of the SSL protocol that defines 207 its use, and the number of secret bits being used; result of 208 :meth:`ssl.SSLSocket.cipher` 209 210 - ``'peercert'``: peer certificate; result of 211 :meth:`ssl.SSLSocket.getpeercert` 212 213 - ``'sslcontext'``: :class:`ssl.SSLContext` instance 214 215 - ``'ssl_object'``: :class:`ssl.SSLObject` or 216 :class:`ssl.SSLSocket` instance 217 218 * pipe: 219 220 - ``'pipe'``: pipe object 221 222 * subprocess: 223 224 - ``'subprocess'``: :class:`subprocess.Popen` instance 225 226.. method:: BaseTransport.set_protocol(protocol) 227 228 Set a new protocol. 229 230 Switching protocol should only be done when both 231 protocols are documented to support the switch. 232 233.. method:: BaseTransport.get_protocol() 234 235 Return the current protocol. 236 237 238Read-only Transports 239-------------------- 240 241.. method:: ReadTransport.is_reading() 242 243 Return ``True`` if the transport is receiving new data. 244 245 .. versionadded:: 3.7 246 247.. method:: ReadTransport.pause_reading() 248 249 Pause the receiving end of the transport. No data will be passed to 250 the protocol's :meth:`protocol.data_received() <Protocol.data_received>` 251 method until :meth:`resume_reading` is called. 252 253 .. versionchanged:: 3.7 254 The method is idempotent, i.e. it can be called when the 255 transport is already paused or closed. 256 257.. method:: ReadTransport.resume_reading() 258 259 Resume the receiving end. The protocol's 260 :meth:`protocol.data_received() <Protocol.data_received>` method 261 will be called once again if some data is available for reading. 262 263 .. versionchanged:: 3.7 264 The method is idempotent, i.e. it can be called when the 265 transport is already reading. 266 267 268Write-only Transports 269--------------------- 270 271.. method:: WriteTransport.abort() 272 273 Close the transport immediately, without waiting for pending operations 274 to complete. Buffered data will be lost. No more data will be received. 275 The protocol's :meth:`protocol.connection_lost() 276 <BaseProtocol.connection_lost>` method will eventually be 277 called with :const:`None` as its argument. 278 279.. method:: WriteTransport.can_write_eof() 280 281 Return :const:`True` if the transport supports 282 :meth:`~WriteTransport.write_eof`, :const:`False` if not. 283 284.. method:: WriteTransport.get_write_buffer_size() 285 286 Return the current size of the output buffer used by the transport. 287 288.. method:: WriteTransport.get_write_buffer_limits() 289 290 Get the *high* and *low* watermarks for write flow control. Return a 291 tuple ``(low, high)`` where *low* and *high* are positive number of 292 bytes. 293 294 Use :meth:`set_write_buffer_limits` to set the limits. 295 296 .. versionadded:: 3.4.2 297 298.. method:: WriteTransport.set_write_buffer_limits(high=None, low=None) 299 300 Set the *high* and *low* watermarks for write flow control. 301 302 These two values (measured in number of 303 bytes) control when the protocol's 304 :meth:`protocol.pause_writing() <BaseProtocol.pause_writing>` 305 and :meth:`protocol.resume_writing() <BaseProtocol.resume_writing>` 306 methods are called. If specified, the low watermark must be less 307 than or equal to the high watermark. Neither *high* nor *low* 308 can be negative. 309 310 :meth:`~BaseProtocol.pause_writing` is called when the buffer size 311 becomes greater than or equal to the *high* value. If writing has 312 been paused, :meth:`~BaseProtocol.resume_writing` is called when 313 the buffer size becomes less than or equal to the *low* value. 314 315 The defaults are implementation-specific. If only the 316 high watermark is given, the low watermark defaults to an 317 implementation-specific value less than or equal to the 318 high watermark. Setting *high* to zero forces *low* to zero as 319 well, and causes :meth:`~BaseProtocol.pause_writing` to be called 320 whenever the buffer becomes non-empty. Setting *low* to zero causes 321 :meth:`~BaseProtocol.resume_writing` to be called only once the 322 buffer is empty. Use of zero for either limit is generally 323 sub-optimal as it reduces opportunities for doing I/O and 324 computation concurrently. 325 326 Use :meth:`~WriteTransport.get_write_buffer_limits` 327 to get the limits. 328 329.. method:: WriteTransport.write(data) 330 331 Write some *data* bytes to the transport. 332 333 This method does not block; it buffers the data and arranges for it 334 to be sent out asynchronously. 335 336.. method:: WriteTransport.writelines(list_of_data) 337 338 Write a list (or any iterable) of data bytes to the transport. 339 This is functionally equivalent to calling :meth:`write` on each 340 element yielded by the iterable, but may be implemented more 341 efficiently. 342 343.. method:: WriteTransport.write_eof() 344 345 Close the write end of the transport after flushing all buffered data. 346 Data may still be received. 347 348 This method can raise :exc:`NotImplementedError` if the transport 349 (e.g. SSL) doesn't support half-closed connections. 350 351 352Datagram Transports 353------------------- 354 355.. method:: DatagramTransport.sendto(data, addr=None) 356 357 Send the *data* bytes to the remote peer given by *addr* (a 358 transport-dependent target address). If *addr* is :const:`None`, 359 the data is sent to the target address given on transport 360 creation. 361 362 This method does not block; it buffers the data and arranges 363 for it to be sent out asynchronously. 364 365 .. versionchanged:: 3.13 366 This method can be called with an empty bytes object to send a 367 zero-length datagram. The buffer size calculation used for flow 368 control is also updated to account for the datagram header. 369 370.. method:: DatagramTransport.abort() 371 372 Close the transport immediately, without waiting for pending 373 operations to complete. Buffered data will be lost. 374 No more data will be received. The protocol's 375 :meth:`protocol.connection_lost() <BaseProtocol.connection_lost>` 376 method will eventually be called with :const:`None` as its argument. 377 378 379.. _asyncio-subprocess-transports: 380 381Subprocess Transports 382--------------------- 383 384.. method:: SubprocessTransport.get_pid() 385 386 Return the subprocess process id as an integer. 387 388.. method:: SubprocessTransport.get_pipe_transport(fd) 389 390 Return the transport for the communication pipe corresponding to the 391 integer file descriptor *fd*: 392 393 * ``0``: readable streaming transport of the standard input (*stdin*), 394 or :const:`None` if the subprocess was not created with ``stdin=PIPE`` 395 * ``1``: writable streaming transport of the standard output (*stdout*), 396 or :const:`None` if the subprocess was not created with ``stdout=PIPE`` 397 * ``2``: writable streaming transport of the standard error (*stderr*), 398 or :const:`None` if the subprocess was not created with ``stderr=PIPE`` 399 * other *fd*: :const:`None` 400 401.. method:: SubprocessTransport.get_returncode() 402 403 Return the subprocess return code as an integer or :const:`None` 404 if it hasn't returned, which is similar to the 405 :attr:`subprocess.Popen.returncode` attribute. 406 407.. method:: SubprocessTransport.kill() 408 409 Kill the subprocess. 410 411 On POSIX systems, the function sends SIGKILL to the subprocess. 412 On Windows, this method is an alias for :meth:`terminate`. 413 414 See also :meth:`subprocess.Popen.kill`. 415 416.. method:: SubprocessTransport.send_signal(signal) 417 418 Send the *signal* number to the subprocess, as in 419 :meth:`subprocess.Popen.send_signal`. 420 421.. method:: SubprocessTransport.terminate() 422 423 Stop the subprocess. 424 425 On POSIX systems, this method sends :py:const:`~signal.SIGTERM` to the subprocess. 426 On Windows, the Windows API function :c:func:`!TerminateProcess` is called to 427 stop the subprocess. 428 429 See also :meth:`subprocess.Popen.terminate`. 430 431.. method:: SubprocessTransport.close() 432 433 Kill the subprocess by calling the :meth:`kill` method. 434 435 If the subprocess hasn't returned yet, and close transports of 436 *stdin*, *stdout*, and *stderr* pipes. 437 438 439.. _asyncio-protocol: 440 441Protocols 442========= 443 444**Source code:** :source:`Lib/asyncio/protocols.py` 445 446--------------------------------------------------- 447 448asyncio provides a set of abstract base classes that should be used 449to implement network protocols. Those classes are meant to be used 450together with :ref:`transports <asyncio-transport>`. 451 452Subclasses of abstract base protocol classes may implement some or 453all methods. All these methods are callbacks: they are called by 454transports on certain events, for example when some data is received. 455A base protocol method should be called by the corresponding transport. 456 457 458Base Protocols 459-------------- 460 461.. class:: BaseProtocol 462 463 Base protocol with methods that all protocols share. 464 465.. class:: Protocol(BaseProtocol) 466 467 The base class for implementing streaming protocols 468 (TCP, Unix sockets, etc). 469 470.. class:: BufferedProtocol(BaseProtocol) 471 472 A base class for implementing streaming protocols with manual 473 control of the receive buffer. 474 475.. class:: DatagramProtocol(BaseProtocol) 476 477 The base class for implementing datagram (UDP) protocols. 478 479.. class:: SubprocessProtocol(BaseProtocol) 480 481 The base class for implementing protocols communicating with child 482 processes (unidirectional pipes). 483 484 485Base Protocol 486------------- 487 488All asyncio protocols can implement Base Protocol callbacks. 489 490.. rubric:: Connection Callbacks 491 492Connection callbacks are called on all protocols, exactly once per 493a successful connection. All other protocol callbacks can only be 494called between those two methods. 495 496.. method:: BaseProtocol.connection_made(transport) 497 498 Called when a connection is made. 499 500 The *transport* argument is the transport representing the 501 connection. The protocol is responsible for storing the reference 502 to its transport. 503 504.. method:: BaseProtocol.connection_lost(exc) 505 506 Called when the connection is lost or closed. 507 508 The argument is either an exception object or :const:`None`. 509 The latter means a regular EOF is received, or the connection was 510 aborted or closed by this side of the connection. 511 512 513.. rubric:: Flow Control Callbacks 514 515Flow control callbacks can be called by transports to pause or 516resume writing performed by the protocol. 517 518See the documentation of the :meth:`~WriteTransport.set_write_buffer_limits` 519method for more details. 520 521.. method:: BaseProtocol.pause_writing() 522 523 Called when the transport's buffer goes over the high watermark. 524 525.. method:: BaseProtocol.resume_writing() 526 527 Called when the transport's buffer drains below the low watermark. 528 529If the buffer size equals the high watermark, 530:meth:`~BaseProtocol.pause_writing` is not called: the buffer size must 531go strictly over. 532 533Conversely, :meth:`~BaseProtocol.resume_writing` is called when the 534buffer size is equal or lower than the low watermark. These end 535conditions are important to ensure that things go as expected when 536either mark is zero. 537 538 539Streaming Protocols 540------------------- 541 542Event methods, such as :meth:`loop.create_server`, 543:meth:`loop.create_unix_server`, :meth:`loop.create_connection`, 544:meth:`loop.create_unix_connection`, :meth:`loop.connect_accepted_socket`, 545:meth:`loop.connect_read_pipe`, and :meth:`loop.connect_write_pipe` 546accept factories that return streaming protocols. 547 548.. method:: Protocol.data_received(data) 549 550 Called when some data is received. *data* is a non-empty bytes 551 object containing the incoming data. 552 553 Whether the data is buffered, chunked or reassembled depends on 554 the transport. In general, you shouldn't rely on specific semantics 555 and instead make your parsing generic and flexible. However, 556 data is always received in the correct order. 557 558 The method can be called an arbitrary number of times while 559 a connection is open. 560 561 However, :meth:`protocol.eof_received() <Protocol.eof_received>` 562 is called at most once. Once ``eof_received()`` is called, 563 ``data_received()`` is not called anymore. 564 565.. method:: Protocol.eof_received() 566 567 Called when the other end signals it won't send any more data 568 (for example by calling :meth:`transport.write_eof() 569 <WriteTransport.write_eof>`, if the other end also uses 570 asyncio). 571 572 This method may return a false value (including ``None``), in which case 573 the transport will close itself. Conversely, if this method returns a 574 true value, the protocol used determines whether to close the transport. 575 Since the default implementation returns ``None``, it implicitly closes the 576 connection. 577 578 Some transports, including SSL, don't support half-closed connections, 579 in which case returning true from this method will result in the connection 580 being closed. 581 582 583State machine: 584 585.. code-block:: none 586 587 start -> connection_made 588 [-> data_received]* 589 [-> eof_received]? 590 -> connection_lost -> end 591 592 593Buffered Streaming Protocols 594---------------------------- 595 596.. versionadded:: 3.7 597 598Buffered Protocols can be used with any event loop method 599that supports `Streaming Protocols`_. 600 601``BufferedProtocol`` implementations allow explicit manual allocation 602and control of the receive buffer. Event loops can then use the buffer 603provided by the protocol to avoid unnecessary data copies. This 604can result in noticeable performance improvement for protocols that 605receive big amounts of data. Sophisticated protocol implementations 606can significantly reduce the number of buffer allocations. 607 608The following callbacks are called on :class:`BufferedProtocol` 609instances: 610 611.. method:: BufferedProtocol.get_buffer(sizehint) 612 613 Called to allocate a new receive buffer. 614 615 *sizehint* is the recommended minimum size for the returned 616 buffer. It is acceptable to return smaller or larger buffers 617 than what *sizehint* suggests. When set to -1, the buffer size 618 can be arbitrary. It is an error to return a buffer with a zero size. 619 620 ``get_buffer()`` must return an object implementing the 621 :ref:`buffer protocol <bufferobjects>`. 622 623.. method:: BufferedProtocol.buffer_updated(nbytes) 624 625 Called when the buffer was updated with the received data. 626 627 *nbytes* is the total number of bytes that were written to the buffer. 628 629.. method:: BufferedProtocol.eof_received() 630 631 See the documentation of the :meth:`protocol.eof_received() 632 <Protocol.eof_received>` method. 633 634 635:meth:`~BufferedProtocol.get_buffer` can be called an arbitrary number 636of times during a connection. However, :meth:`protocol.eof_received() 637<Protocol.eof_received>` is called at most once 638and, if called, :meth:`~BufferedProtocol.get_buffer` and 639:meth:`~BufferedProtocol.buffer_updated` won't be called after it. 640 641State machine: 642 643.. code-block:: none 644 645 start -> connection_made 646 [-> get_buffer 647 [-> buffer_updated]? 648 ]* 649 [-> eof_received]? 650 -> connection_lost -> end 651 652 653Datagram Protocols 654------------------ 655 656Datagram Protocol instances should be constructed by protocol 657factories passed to the :meth:`loop.create_datagram_endpoint` method. 658 659.. method:: DatagramProtocol.datagram_received(data, addr) 660 661 Called when a datagram is received. *data* is a bytes object containing 662 the incoming data. *addr* is the address of the peer sending the data; 663 the exact format depends on the transport. 664 665.. method:: DatagramProtocol.error_received(exc) 666 667 Called when a previous send or receive operation raises an 668 :class:`OSError`. *exc* is the :class:`OSError` instance. 669 670 This method is called in rare conditions, when the transport (e.g. UDP) 671 detects that a datagram could not be delivered to its recipient. 672 In many conditions though, undeliverable datagrams will be silently 673 dropped. 674 675.. note:: 676 677 On BSD systems (macOS, FreeBSD, etc.) flow control is not supported 678 for datagram protocols, because there is no reliable way to detect send 679 failures caused by writing too many packets. 680 681 The socket always appears 'ready' and excess packets are dropped. An 682 :class:`OSError` with ``errno`` set to :const:`errno.ENOBUFS` may 683 or may not be raised; if it is raised, it will be reported to 684 :meth:`DatagramProtocol.error_received` but otherwise ignored. 685 686 687.. _asyncio-subprocess-protocols: 688 689Subprocess Protocols 690-------------------- 691 692Subprocess Protocol instances should be constructed by protocol 693factories passed to the :meth:`loop.subprocess_exec` and 694:meth:`loop.subprocess_shell` methods. 695 696.. method:: SubprocessProtocol.pipe_data_received(fd, data) 697 698 Called when the child process writes data into its stdout or stderr 699 pipe. 700 701 *fd* is the integer file descriptor of the pipe. 702 703 *data* is a non-empty bytes object containing the received data. 704 705.. method:: SubprocessProtocol.pipe_connection_lost(fd, exc) 706 707 Called when one of the pipes communicating with the child process 708 is closed. 709 710 *fd* is the integer file descriptor that was closed. 711 712.. method:: SubprocessProtocol.process_exited() 713 714 Called when the child process has exited. 715 716 It can be called before :meth:`~SubprocessProtocol.pipe_data_received` and 717 :meth:`~SubprocessProtocol.pipe_connection_lost` methods. 718 719 720Examples 721======== 722 723.. _asyncio_example_tcp_echo_server_protocol: 724 725TCP Echo Server 726--------------- 727 728Create a TCP echo server using the :meth:`loop.create_server` method, send back 729received data, and close the connection:: 730 731 import asyncio 732 733 734 class EchoServerProtocol(asyncio.Protocol): 735 def connection_made(self, transport): 736 peername = transport.get_extra_info('peername') 737 print('Connection from {}'.format(peername)) 738 self.transport = transport 739 740 def data_received(self, data): 741 message = data.decode() 742 print('Data received: {!r}'.format(message)) 743 744 print('Send: {!r}'.format(message)) 745 self.transport.write(data) 746 747 print('Close the client socket') 748 self.transport.close() 749 750 751 async def main(): 752 # Get a reference to the event loop as we plan to use 753 # low-level APIs. 754 loop = asyncio.get_running_loop() 755 756 server = await loop.create_server( 757 EchoServerProtocol, 758 '127.0.0.1', 8888) 759 760 async with server: 761 await server.serve_forever() 762 763 764 asyncio.run(main()) 765 766 767.. seealso:: 768 769 The :ref:`TCP echo server using streams <asyncio-tcp-echo-server-streams>` 770 example uses the high-level :func:`asyncio.start_server` function. 771 772.. _asyncio_example_tcp_echo_client_protocol: 773 774TCP Echo Client 775--------------- 776 777A TCP echo client using the :meth:`loop.create_connection` method, sends 778data, and waits until the connection is closed:: 779 780 import asyncio 781 782 783 class EchoClientProtocol(asyncio.Protocol): 784 def __init__(self, message, on_con_lost): 785 self.message = message 786 self.on_con_lost = on_con_lost 787 788 def connection_made(self, transport): 789 transport.write(self.message.encode()) 790 print('Data sent: {!r}'.format(self.message)) 791 792 def data_received(self, data): 793 print('Data received: {!r}'.format(data.decode())) 794 795 def connection_lost(self, exc): 796 print('The server closed the connection') 797 self.on_con_lost.set_result(True) 798 799 800 async def main(): 801 # Get a reference to the event loop as we plan to use 802 # low-level APIs. 803 loop = asyncio.get_running_loop() 804 805 on_con_lost = loop.create_future() 806 message = 'Hello World!' 807 808 transport, protocol = await loop.create_connection( 809 lambda: EchoClientProtocol(message, on_con_lost), 810 '127.0.0.1', 8888) 811 812 # Wait until the protocol signals that the connection 813 # is lost and close the transport. 814 try: 815 await on_con_lost 816 finally: 817 transport.close() 818 819 820 asyncio.run(main()) 821 822 823.. seealso:: 824 825 The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>` 826 example uses the high-level :func:`asyncio.open_connection` function. 827 828 829.. _asyncio-udp-echo-server-protocol: 830 831UDP Echo Server 832--------------- 833 834A UDP echo server, using the :meth:`loop.create_datagram_endpoint` 835method, sends back received data:: 836 837 import asyncio 838 839 840 class EchoServerProtocol: 841 def connection_made(self, transport): 842 self.transport = transport 843 844 def datagram_received(self, data, addr): 845 message = data.decode() 846 print('Received %r from %s' % (message, addr)) 847 print('Send %r to %s' % (message, addr)) 848 self.transport.sendto(data, addr) 849 850 851 async def main(): 852 print("Starting UDP server") 853 854 # Get a reference to the event loop as we plan to use 855 # low-level APIs. 856 loop = asyncio.get_running_loop() 857 858 # One protocol instance will be created to serve all 859 # client requests. 860 transport, protocol = await loop.create_datagram_endpoint( 861 EchoServerProtocol, 862 local_addr=('127.0.0.1', 9999)) 863 864 try: 865 await asyncio.sleep(3600) # Serve for 1 hour. 866 finally: 867 transport.close() 868 869 870 asyncio.run(main()) 871 872 873.. _asyncio-udp-echo-client-protocol: 874 875UDP Echo Client 876--------------- 877 878A UDP echo client, using the :meth:`loop.create_datagram_endpoint` 879method, sends data and closes the transport when it receives the answer:: 880 881 import asyncio 882 883 884 class EchoClientProtocol: 885 def __init__(self, message, on_con_lost): 886 self.message = message 887 self.on_con_lost = on_con_lost 888 self.transport = None 889 890 def connection_made(self, transport): 891 self.transport = transport 892 print('Send:', self.message) 893 self.transport.sendto(self.message.encode()) 894 895 def datagram_received(self, data, addr): 896 print("Received:", data.decode()) 897 898 print("Close the socket") 899 self.transport.close() 900 901 def error_received(self, exc): 902 print('Error received:', exc) 903 904 def connection_lost(self, exc): 905 print("Connection closed") 906 self.on_con_lost.set_result(True) 907 908 909 async def main(): 910 # Get a reference to the event loop as we plan to use 911 # low-level APIs. 912 loop = asyncio.get_running_loop() 913 914 on_con_lost = loop.create_future() 915 message = "Hello World!" 916 917 transport, protocol = await loop.create_datagram_endpoint( 918 lambda: EchoClientProtocol(message, on_con_lost), 919 remote_addr=('127.0.0.1', 9999)) 920 921 try: 922 await on_con_lost 923 finally: 924 transport.close() 925 926 927 asyncio.run(main()) 928 929 930.. _asyncio_example_create_connection: 931 932Connecting Existing Sockets 933--------------------------- 934 935Wait until a socket receives data using the 936:meth:`loop.create_connection` method with a protocol:: 937 938 import asyncio 939 import socket 940 941 942 class MyProtocol(asyncio.Protocol): 943 944 def __init__(self, on_con_lost): 945 self.transport = None 946 self.on_con_lost = on_con_lost 947 948 def connection_made(self, transport): 949 self.transport = transport 950 951 def data_received(self, data): 952 print("Received:", data.decode()) 953 954 # We are done: close the transport; 955 # connection_lost() will be called automatically. 956 self.transport.close() 957 958 def connection_lost(self, exc): 959 # The socket has been closed 960 self.on_con_lost.set_result(True) 961 962 963 async def main(): 964 # Get a reference to the event loop as we plan to use 965 # low-level APIs. 966 loop = asyncio.get_running_loop() 967 on_con_lost = loop.create_future() 968 969 # Create a pair of connected sockets 970 rsock, wsock = socket.socketpair() 971 972 # Register the socket to wait for data. 973 transport, protocol = await loop.create_connection( 974 lambda: MyProtocol(on_con_lost), sock=rsock) 975 976 # Simulate the reception of data from the network. 977 loop.call_soon(wsock.send, 'abc'.encode()) 978 979 try: 980 await protocol.on_con_lost 981 finally: 982 transport.close() 983 wsock.close() 984 985 asyncio.run(main()) 986 987.. seealso:: 988 989 The :ref:`watch a file descriptor for read events 990 <asyncio_example_watch_fd>` example uses the low-level 991 :meth:`loop.add_reader` method to register an FD. 992 993 The :ref:`register an open socket to wait for data using streams 994 <asyncio_example_create_connection-streams>` example uses high-level streams 995 created by the :func:`open_connection` function in a coroutine. 996 997.. _asyncio_example_subprocess_proto: 998 999loop.subprocess_exec() and SubprocessProtocol 1000--------------------------------------------- 1001 1002An example of a subprocess protocol used to get the output of a 1003subprocess and to wait for the subprocess exit. 1004 1005The subprocess is created by the :meth:`loop.subprocess_exec` method:: 1006 1007 import asyncio 1008 import sys 1009 1010 class DateProtocol(asyncio.SubprocessProtocol): 1011 def __init__(self, exit_future): 1012 self.exit_future = exit_future 1013 self.output = bytearray() 1014 self.pipe_closed = False 1015 self.exited = False 1016 1017 def pipe_connection_lost(self, fd, exc): 1018 self.pipe_closed = True 1019 self.check_for_exit() 1020 1021 def pipe_data_received(self, fd, data): 1022 self.output.extend(data) 1023 1024 def process_exited(self): 1025 self.exited = True 1026 # process_exited() method can be called before 1027 # pipe_connection_lost() method: wait until both methods are 1028 # called. 1029 self.check_for_exit() 1030 1031 def check_for_exit(self): 1032 if self.pipe_closed and self.exited: 1033 self.exit_future.set_result(True) 1034 1035 async def get_date(): 1036 # Get a reference to the event loop as we plan to use 1037 # low-level APIs. 1038 loop = asyncio.get_running_loop() 1039 1040 code = 'import datetime; print(datetime.datetime.now())' 1041 exit_future = asyncio.Future(loop=loop) 1042 1043 # Create the subprocess controlled by DateProtocol; 1044 # redirect the standard output into a pipe. 1045 transport, protocol = await loop.subprocess_exec( 1046 lambda: DateProtocol(exit_future), 1047 sys.executable, '-c', code, 1048 stdin=None, stderr=None) 1049 1050 # Wait for the subprocess exit using the process_exited() 1051 # method of the protocol. 1052 await exit_future 1053 1054 # Close the stdout pipe. 1055 transport.close() 1056 1057 # Read the output which was collected by the 1058 # pipe_data_received() method of the protocol. 1059 data = bytes(protocol.output) 1060 return data.decode('ascii').rstrip() 1061 1062 date = asyncio.run(get_date()) 1063 print(f"Current date: {date}") 1064 1065See also the :ref:`same example <asyncio_example_create_subprocess_exec>` 1066written using high-level APIs. 1067