1.. currentmodule:: asyncio 2 3.. _asyncio-streams: 4 5======= 6Streams 7======= 8 9**Source code:** :source:`Lib/asyncio/streams.py` 10 11------------------------------------------------- 12 13Streams are high-level async/await-ready primitives to work with 14network connections. Streams allow sending and receiving data without 15using callbacks or low-level protocols and transports. 16 17.. _asyncio_example_stream: 18 19Here is an example of a TCP echo client written using asyncio 20streams:: 21 22 import asyncio 23 24 async def tcp_echo_client(message): 25 reader, writer = await asyncio.open_connection( 26 '127.0.0.1', 8888) 27 28 print(f'Send: {message!r}') 29 writer.write(message.encode()) 30 await writer.drain() 31 32 data = await reader.read(100) 33 print(f'Received: {data.decode()!r}') 34 35 print('Close the connection') 36 writer.close() 37 await writer.wait_closed() 38 39 asyncio.run(tcp_echo_client('Hello World!')) 40 41 42See also the `Examples`_ section below. 43 44 45.. rubric:: Stream Functions 46 47The following top-level asyncio functions can be used to create 48and work with streams: 49 50 51.. coroutinefunction:: open_connection(host=None, port=None, *, \ 52 limit=None, ssl=None, family=0, proto=0, \ 53 flags=0, sock=None, local_addr=None, \ 54 server_hostname=None, ssl_handshake_timeout=None) 55 56 Establish a network connection and return a pair of 57 ``(reader, writer)`` objects. 58 59 The returned *reader* and *writer* objects are instances of 60 :class:`StreamReader` and :class:`StreamWriter` classes. 61 62 *limit* determines the buffer size limit used by the 63 returned :class:`StreamReader` instance. By default the *limit* 64 is set to 64 KiB. 65 66 The rest of the arguments are passed directly to 67 :meth:`loop.create_connection`. 68 69 .. versionadded:: 3.7 70 71 The *ssl_handshake_timeout* parameter. 72 73 .. deprecated-removed:: 3.8 3.10 74 75 The ``loop`` parameter. This function has been implicitly getting the 76 current running loop since 3.7. See 77 :ref:`What's New in 3.10's Removed section <whatsnew310-removed>` 78 for more information. 79 80 81.. coroutinefunction:: start_server(client_connected_cb, host=None, \ 82 port=None, *, limit=None, \ 83 family=socket.AF_UNSPEC, \ 84 flags=socket.AI_PASSIVE, sock=None, \ 85 backlog=100, ssl=None, reuse_address=None, \ 86 reuse_port=None, ssl_handshake_timeout=None, \ 87 start_serving=True) 88 89 Start a socket server. 90 91 The *client_connected_cb* callback is called whenever a new client 92 connection is established. It receives a ``(reader, writer)`` pair 93 as two arguments, instances of the :class:`StreamReader` and 94 :class:`StreamWriter` classes. 95 96 *client_connected_cb* can be a plain callable or a 97 :ref:`coroutine function <coroutine>`; if it is a coroutine function, 98 it will be automatically scheduled as a :class:`Task`. 99 100 *limit* determines the buffer size limit used by the 101 returned :class:`StreamReader` instance. By default the *limit* 102 is set to 64 KiB. 103 104 The rest of the arguments are passed directly to 105 :meth:`loop.create_server`. 106 107 .. versionadded:: 3.7 108 109 The *ssl_handshake_timeout* and *start_serving* parameters. 110 111 .. deprecated-removed:: 3.8 3.10 112 113 The ``loop`` parameter. This function has been implicitly getting the 114 current running loop since 3.7. See 115 :ref:`What's New in 3.10's Removed section <whatsnew310-removed>` 116 for more information. 117 118 119.. rubric:: Unix Sockets 120 121.. coroutinefunction:: open_unix_connection(path=None, *, limit=None, \ 122 ssl=None, sock=None, server_hostname=None, \ 123 ssl_handshake_timeout=None) 124 125 Establish a Unix socket connection and return a pair of 126 ``(reader, writer)``. 127 128 Similar to :func:`open_connection` but operates on Unix sockets. 129 130 See also the documentation of :meth:`loop.create_unix_connection`. 131 132 .. availability:: Unix. 133 134 .. versionadded:: 3.7 135 136 The *ssl_handshake_timeout* parameter. 137 138 .. versionchanged:: 3.7 139 140 The *path* parameter can now be a :term:`path-like object` 141 142 .. deprecated-removed:: 3.8 3.10 143 144 The ``loop`` parameter. This function has been implicitly getting the 145 current running loop since 3.7. See 146 :ref:`What's New in 3.10's Removed section <whatsnew310-removed>` 147 for more information. 148 149 150.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \ 151 *, limit=None, sock=None, backlog=100, ssl=None, \ 152 ssl_handshake_timeout=None, start_serving=True) 153 154 Start a Unix socket server. 155 156 Similar to :func:`start_server` but works with Unix sockets. 157 158 See also the documentation of :meth:`loop.create_unix_server`. 159 160 .. availability:: Unix. 161 162 .. versionadded:: 3.7 163 164 The *ssl_handshake_timeout* and *start_serving* parameters. 165 166 .. versionchanged:: 3.7 167 168 The *path* parameter can now be a :term:`path-like object`. 169 170 .. deprecated-removed:: 3.8 3.10 171 172 The ``loop`` parameter. This function has been implicitly getting the 173 current running loop since 3.7. See 174 :ref:`What's New in 3.10's Removed section <whatsnew310-removed>` 175 for more information. 176 177 178StreamReader 179============ 180 181.. class:: StreamReader 182 183 Represents a reader object that provides APIs to read data 184 from the IO stream. 185 186 It is not recommended to instantiate *StreamReader* objects 187 directly; use :func:`open_connection` and :func:`start_server` 188 instead. 189 190 .. coroutinemethod:: read(n=-1) 191 192 Read up to *n* bytes. If *n* is not provided, or set to ``-1``, 193 read until EOF and return all read bytes. 194 195 If EOF was received and the internal buffer is empty, 196 return an empty ``bytes`` object. 197 198 .. coroutinemethod:: readline() 199 200 Read one line, where "line" is a sequence of bytes 201 ending with ``\n``. 202 203 If EOF is received and ``\n`` was not found, the method 204 returns partially read data. 205 206 If EOF is received and the internal buffer is empty, 207 return an empty ``bytes`` object. 208 209 .. coroutinemethod:: readexactly(n) 210 211 Read exactly *n* bytes. 212 213 Raise an :exc:`IncompleteReadError` if EOF is reached before *n* 214 can be read. Use the :attr:`IncompleteReadError.partial` 215 attribute to get the partially read data. 216 217 .. coroutinemethod:: readuntil(separator=b'\\n') 218 219 Read data from the stream until *separator* is found. 220 221 On success, the data and separator will be removed from the 222 internal buffer (consumed). Returned data will include the 223 separator at the end. 224 225 If the amount of data read exceeds the configured stream limit, a 226 :exc:`LimitOverrunError` exception is raised, and the data 227 is left in the internal buffer and can be read again. 228 229 If EOF is reached before the complete separator is found, 230 an :exc:`IncompleteReadError` exception is raised, and the internal 231 buffer is reset. The :attr:`IncompleteReadError.partial` attribute 232 may contain a portion of the separator. 233 234 .. versionadded:: 3.5.2 235 236 .. method:: at_eof() 237 238 Return ``True`` if the buffer is empty and :meth:`feed_eof` 239 was called. 240 241 242StreamWriter 243============ 244 245.. class:: StreamWriter 246 247 Represents a writer object that provides APIs to write data 248 to the IO stream. 249 250 It is not recommended to instantiate *StreamWriter* objects 251 directly; use :func:`open_connection` and :func:`start_server` 252 instead. 253 254 .. method:: write(data) 255 256 The method attempts to write the *data* to the underlying socket immediately. 257 If that fails, the data is queued in an internal write buffer until it can be 258 sent. 259 260 The method should be used along with the ``drain()`` method:: 261 262 stream.write(data) 263 await stream.drain() 264 265 .. method:: writelines(data) 266 267 The method writes a list (or any iterable) of bytes to the underlying socket 268 immediately. 269 If that fails, the data is queued in an internal write buffer until it can be 270 sent. 271 272 The method should be used along with the ``drain()`` method:: 273 274 stream.writelines(lines) 275 await stream.drain() 276 277 .. method:: close() 278 279 The method closes the stream and the underlying socket. 280 281 The method should be used along with the ``wait_closed()`` method:: 282 283 stream.close() 284 await stream.wait_closed() 285 286 .. method:: can_write_eof() 287 288 Return ``True`` if the underlying transport supports 289 the :meth:`write_eof` method, ``False`` otherwise. 290 291 .. method:: write_eof() 292 293 Close the write end of the stream after the buffered write 294 data is flushed. 295 296 .. attribute:: transport 297 298 Return the underlying asyncio transport. 299 300 .. method:: get_extra_info(name, default=None) 301 302 Access optional transport information; see 303 :meth:`BaseTransport.get_extra_info` for details. 304 305 .. coroutinemethod:: drain() 306 307 Wait until it is appropriate to resume writing to the stream. 308 Example:: 309 310 writer.write(data) 311 await writer.drain() 312 313 This is a flow control method that interacts with the underlying 314 IO write buffer. When the size of the buffer reaches 315 the high watermark, *drain()* blocks until the size of the 316 buffer is drained down to the low watermark and writing can 317 be resumed. When there is nothing to wait for, the :meth:`drain` 318 returns immediately. 319 320 .. method:: is_closing() 321 322 Return ``True`` if the stream is closed or in the process of 323 being closed. 324 325 .. versionadded:: 3.7 326 327 .. coroutinemethod:: wait_closed() 328 329 Wait until the stream is closed. 330 331 Should be called after :meth:`close` to wait until the underlying 332 connection is closed. 333 334 .. versionadded:: 3.7 335 336 337Examples 338======== 339 340.. _asyncio-tcp-echo-client-streams: 341 342TCP echo client using streams 343----------------------------- 344 345TCP echo client using the :func:`asyncio.open_connection` function:: 346 347 import asyncio 348 349 async def tcp_echo_client(message): 350 reader, writer = await asyncio.open_connection( 351 '127.0.0.1', 8888) 352 353 print(f'Send: {message!r}') 354 writer.write(message.encode()) 355 356 data = await reader.read(100) 357 print(f'Received: {data.decode()!r}') 358 359 print('Close the connection') 360 writer.close() 361 362 asyncio.run(tcp_echo_client('Hello World!')) 363 364 365.. seealso:: 366 367 The :ref:`TCP echo client protocol <asyncio_example_tcp_echo_client_protocol>` 368 example uses the low-level :meth:`loop.create_connection` method. 369 370 371.. _asyncio-tcp-echo-server-streams: 372 373TCP echo server using streams 374----------------------------- 375 376TCP echo server using the :func:`asyncio.start_server` function:: 377 378 import asyncio 379 380 async def handle_echo(reader, writer): 381 data = await reader.read(100) 382 message = data.decode() 383 addr = writer.get_extra_info('peername') 384 385 print(f"Received {message!r} from {addr!r}") 386 387 print(f"Send: {message!r}") 388 writer.write(data) 389 await writer.drain() 390 391 print("Close the connection") 392 writer.close() 393 394 async def main(): 395 server = await asyncio.start_server( 396 handle_echo, '127.0.0.1', 8888) 397 398 addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets) 399 print(f'Serving on {addrs}') 400 401 async with server: 402 await server.serve_forever() 403 404 asyncio.run(main()) 405 406 407.. seealso:: 408 409 The :ref:`TCP echo server protocol <asyncio_example_tcp_echo_server_protocol>` 410 example uses the :meth:`loop.create_server` method. 411 412 413Get HTTP headers 414---------------- 415 416Simple example querying HTTP headers of the URL passed on the command line:: 417 418 import asyncio 419 import urllib.parse 420 import sys 421 422 async def print_http_headers(url): 423 url = urllib.parse.urlsplit(url) 424 if url.scheme == 'https': 425 reader, writer = await asyncio.open_connection( 426 url.hostname, 443, ssl=True) 427 else: 428 reader, writer = await asyncio.open_connection( 429 url.hostname, 80) 430 431 query = ( 432 f"HEAD {url.path or '/'} HTTP/1.0\r\n" 433 f"Host: {url.hostname}\r\n" 434 f"\r\n" 435 ) 436 437 writer.write(query.encode('latin-1')) 438 while True: 439 line = await reader.readline() 440 if not line: 441 break 442 443 line = line.decode('latin1').rstrip() 444 if line: 445 print(f'HTTP header> {line}') 446 447 # Ignore the body, close the socket 448 writer.close() 449 450 url = sys.argv[1] 451 asyncio.run(print_http_headers(url)) 452 453 454Usage:: 455 456 python example.py http://example.com/path/page.html 457 458or with HTTPS:: 459 460 python example.py https://example.com/path/page.html 461 462 463.. _asyncio_example_create_connection-streams: 464 465Register an open socket to wait for data using streams 466------------------------------------------------------ 467 468Coroutine waiting until a socket receives data using the 469:func:`open_connection` function:: 470 471 import asyncio 472 import socket 473 474 async def wait_for_data(): 475 # Get a reference to the current event loop because 476 # we want to access low-level APIs. 477 loop = asyncio.get_running_loop() 478 479 # Create a pair of connected sockets. 480 rsock, wsock = socket.socketpair() 481 482 # Register the open socket to wait for data. 483 reader, writer = await asyncio.open_connection(sock=rsock) 484 485 # Simulate the reception of data from the network 486 loop.call_soon(wsock.send, 'abc'.encode()) 487 488 # Wait for data 489 data = await reader.read(100) 490 491 # Got data, we are done: close the socket 492 print("Received:", data.decode()) 493 writer.close() 494 495 # Close the second socket 496 wsock.close() 497 498 asyncio.run(wait_for_data()) 499 500.. seealso:: 501 502 The :ref:`register an open socket to wait for data using a protocol 503 <asyncio_example_create_connection>` example uses a low-level protocol and 504 the :meth:`loop.create_connection` method. 505 506 The :ref:`watch a file descriptor for read events 507 <asyncio_example_watch_fd>` example uses the low-level 508 :meth:`loop.add_reader` method to watch a file descriptor. 509