1"""Tests for streams.py.""" 2 3import gc 4import os 5import queue 6import pickle 7import socket 8import sys 9import threading 10import unittest 11from unittest import mock 12from test import support 13try: 14 import ssl 15except ImportError: 16 ssl = None 17 18import asyncio 19from test.test_asyncio import utils as test_utils 20 21 22def tearDownModule(): 23 asyncio.set_event_loop_policy(None) 24 25 26class StreamTests(test_utils.TestCase): 27 28 DATA = b'line1\nline2\nline3\n' 29 30 def setUp(self): 31 super().setUp() 32 self.loop = asyncio.new_event_loop() 33 self.set_event_loop(self.loop) 34 35 def tearDown(self): 36 # just in case if we have transport close callbacks 37 test_utils.run_briefly(self.loop) 38 39 self.loop.close() 40 gc.collect() 41 super().tearDown() 42 43 @mock.patch('asyncio.streams.events') 44 def test_ctor_global_loop(self, m_events): 45 stream = asyncio.StreamReader() 46 self.assertIs(stream._loop, m_events.get_event_loop.return_value) 47 48 def _basetest_open_connection(self, open_connection_fut): 49 messages = [] 50 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) 51 with self.assertWarns(DeprecationWarning): 52 reader, writer = self.loop.run_until_complete(open_connection_fut) 53 writer.write(b'GET / HTTP/1.0\r\n\r\n') 54 f = reader.readline() 55 data = self.loop.run_until_complete(f) 56 self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') 57 f = reader.read() 58 data = self.loop.run_until_complete(f) 59 self.assertTrue(data.endswith(b'\r\n\r\nTest message')) 60 writer.close() 61 self.assertEqual(messages, []) 62 63 def test_open_connection(self): 64 with test_utils.run_test_server() as httpd: 65 conn_fut = asyncio.open_connection(*httpd.address, 66 loop=self.loop) 67 self._basetest_open_connection(conn_fut) 68 69 @support.skip_unless_bind_unix_socket 70 def test_open_unix_connection(self): 71 with test_utils.run_test_unix_server() as httpd: 72 conn_fut = asyncio.open_unix_connection(httpd.address, 73 loop=self.loop) 74 self._basetest_open_connection(conn_fut) 75 76 def _basetest_open_connection_no_loop_ssl(self, open_connection_fut): 77 messages = [] 78 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) 79 try: 80 with self.assertWarns(DeprecationWarning): 81 reader, writer = self.loop.run_until_complete(open_connection_fut) 82 finally: 83 asyncio.set_event_loop(None) 84 writer.write(b'GET / HTTP/1.0\r\n\r\n') 85 f = reader.read() 86 data = self.loop.run_until_complete(f) 87 self.assertTrue(data.endswith(b'\r\n\r\nTest message')) 88 89 writer.close() 90 self.assertEqual(messages, []) 91 92 @unittest.skipIf(ssl is None, 'No ssl module') 93 def test_open_connection_no_loop_ssl(self): 94 with test_utils.run_test_server(use_ssl=True) as httpd: 95 conn_fut = asyncio.open_connection( 96 *httpd.address, 97 ssl=test_utils.dummy_ssl_context(), 98 loop=self.loop) 99 100 self._basetest_open_connection_no_loop_ssl(conn_fut) 101 102 @support.skip_unless_bind_unix_socket 103 @unittest.skipIf(ssl is None, 'No ssl module') 104 def test_open_unix_connection_no_loop_ssl(self): 105 with test_utils.run_test_unix_server(use_ssl=True) as httpd: 106 conn_fut = asyncio.open_unix_connection( 107 httpd.address, 108 ssl=test_utils.dummy_ssl_context(), 109 server_hostname='', 110 loop=self.loop) 111 112 self._basetest_open_connection_no_loop_ssl(conn_fut) 113 114 def _basetest_open_connection_error(self, open_connection_fut): 115 messages = [] 116 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) 117 with self.assertWarns(DeprecationWarning): 118 reader, writer = self.loop.run_until_complete(open_connection_fut) 119 writer._protocol.connection_lost(ZeroDivisionError()) 120 f = reader.read() 121 with self.assertRaises(ZeroDivisionError): 122 self.loop.run_until_complete(f) 123 writer.close() 124 test_utils.run_briefly(self.loop) 125 self.assertEqual(messages, []) 126 127 def test_open_connection_error(self): 128 with test_utils.run_test_server() as httpd: 129 conn_fut = asyncio.open_connection(*httpd.address, 130 loop=self.loop) 131 self._basetest_open_connection_error(conn_fut) 132 133 @support.skip_unless_bind_unix_socket 134 def test_open_unix_connection_error(self): 135 with test_utils.run_test_unix_server() as httpd: 136 conn_fut = asyncio.open_unix_connection(httpd.address, 137 loop=self.loop) 138 self._basetest_open_connection_error(conn_fut) 139 140 def test_feed_empty_data(self): 141 stream = asyncio.StreamReader(loop=self.loop) 142 143 stream.feed_data(b'') 144 self.assertEqual(b'', stream._buffer) 145 146 def test_feed_nonempty_data(self): 147 stream = asyncio.StreamReader(loop=self.loop) 148 149 stream.feed_data(self.DATA) 150 self.assertEqual(self.DATA, stream._buffer) 151 152 def test_read_zero(self): 153 # Read zero bytes. 154 stream = asyncio.StreamReader(loop=self.loop) 155 stream.feed_data(self.DATA) 156 157 data = self.loop.run_until_complete(stream.read(0)) 158 self.assertEqual(b'', data) 159 self.assertEqual(self.DATA, stream._buffer) 160 161 def test_read(self): 162 # Read bytes. 163 stream = asyncio.StreamReader(loop=self.loop) 164 read_task = self.loop.create_task(stream.read(30)) 165 166 def cb(): 167 stream.feed_data(self.DATA) 168 self.loop.call_soon(cb) 169 170 data = self.loop.run_until_complete(read_task) 171 self.assertEqual(self.DATA, data) 172 self.assertEqual(b'', stream._buffer) 173 174 def test_read_line_breaks(self): 175 # Read bytes without line breaks. 176 stream = asyncio.StreamReader(loop=self.loop) 177 stream.feed_data(b'line1') 178 stream.feed_data(b'line2') 179 180 data = self.loop.run_until_complete(stream.read(5)) 181 182 self.assertEqual(b'line1', data) 183 self.assertEqual(b'line2', stream._buffer) 184 185 def test_read_eof(self): 186 # Read bytes, stop at eof. 187 stream = asyncio.StreamReader(loop=self.loop) 188 read_task = self.loop.create_task(stream.read(1024)) 189 190 def cb(): 191 stream.feed_eof() 192 self.loop.call_soon(cb) 193 194 data = self.loop.run_until_complete(read_task) 195 self.assertEqual(b'', data) 196 self.assertEqual(b'', stream._buffer) 197 198 def test_read_until_eof(self): 199 # Read all bytes until eof. 200 stream = asyncio.StreamReader(loop=self.loop) 201 read_task = self.loop.create_task(stream.read(-1)) 202 203 def cb(): 204 stream.feed_data(b'chunk1\n') 205 stream.feed_data(b'chunk2') 206 stream.feed_eof() 207 self.loop.call_soon(cb) 208 209 data = self.loop.run_until_complete(read_task) 210 211 self.assertEqual(b'chunk1\nchunk2', data) 212 self.assertEqual(b'', stream._buffer) 213 214 def test_read_exception(self): 215 stream = asyncio.StreamReader(loop=self.loop) 216 stream.feed_data(b'line\n') 217 218 data = self.loop.run_until_complete(stream.read(2)) 219 self.assertEqual(b'li', data) 220 221 stream.set_exception(ValueError()) 222 self.assertRaises( 223 ValueError, self.loop.run_until_complete, stream.read(2)) 224 225 def test_invalid_limit(self): 226 with self.assertRaisesRegex(ValueError, 'imit'): 227 asyncio.StreamReader(limit=0, loop=self.loop) 228 229 with self.assertRaisesRegex(ValueError, 'imit'): 230 asyncio.StreamReader(limit=-1, loop=self.loop) 231 232 def test_read_limit(self): 233 stream = asyncio.StreamReader(limit=3, loop=self.loop) 234 stream.feed_data(b'chunk') 235 data = self.loop.run_until_complete(stream.read(5)) 236 self.assertEqual(b'chunk', data) 237 self.assertEqual(b'', stream._buffer) 238 239 def test_readline(self): 240 # Read one line. 'readline' will need to wait for the data 241 # to come from 'cb' 242 stream = asyncio.StreamReader(loop=self.loop) 243 stream.feed_data(b'chunk1 ') 244 read_task = self.loop.create_task(stream.readline()) 245 246 def cb(): 247 stream.feed_data(b'chunk2 ') 248 stream.feed_data(b'chunk3 ') 249 stream.feed_data(b'\n chunk4') 250 self.loop.call_soon(cb) 251 252 line = self.loop.run_until_complete(read_task) 253 self.assertEqual(b'chunk1 chunk2 chunk3 \n', line) 254 self.assertEqual(b' chunk4', stream._buffer) 255 256 def test_readline_limit_with_existing_data(self): 257 # Read one line. The data is in StreamReader's buffer 258 # before the event loop is run. 259 260 stream = asyncio.StreamReader(limit=3, loop=self.loop) 261 stream.feed_data(b'li') 262 stream.feed_data(b'ne1\nline2\n') 263 264 self.assertRaises( 265 ValueError, self.loop.run_until_complete, stream.readline()) 266 # The buffer should contain the remaining data after exception 267 self.assertEqual(b'line2\n', stream._buffer) 268 269 stream = asyncio.StreamReader(limit=3, loop=self.loop) 270 stream.feed_data(b'li') 271 stream.feed_data(b'ne1') 272 stream.feed_data(b'li') 273 274 self.assertRaises( 275 ValueError, self.loop.run_until_complete, stream.readline()) 276 # No b'\n' at the end. The 'limit' is set to 3. So before 277 # waiting for the new data in buffer, 'readline' will consume 278 # the entire buffer, and since the length of the consumed data 279 # is more than 3, it will raise a ValueError. The buffer is 280 # expected to be empty now. 281 self.assertEqual(b'', stream._buffer) 282 283 def test_at_eof(self): 284 stream = asyncio.StreamReader(loop=self.loop) 285 self.assertFalse(stream.at_eof()) 286 287 stream.feed_data(b'some data\n') 288 self.assertFalse(stream.at_eof()) 289 290 self.loop.run_until_complete(stream.readline()) 291 self.assertFalse(stream.at_eof()) 292 293 stream.feed_data(b'some data\n') 294 stream.feed_eof() 295 self.loop.run_until_complete(stream.readline()) 296 self.assertTrue(stream.at_eof()) 297 298 def test_readline_limit(self): 299 # Read one line. StreamReaders are fed with data after 300 # their 'readline' methods are called. 301 302 stream = asyncio.StreamReader(limit=7, loop=self.loop) 303 def cb(): 304 stream.feed_data(b'chunk1') 305 stream.feed_data(b'chunk2') 306 stream.feed_data(b'chunk3\n') 307 stream.feed_eof() 308 self.loop.call_soon(cb) 309 310 self.assertRaises( 311 ValueError, self.loop.run_until_complete, stream.readline()) 312 # The buffer had just one line of data, and after raising 313 # a ValueError it should be empty. 314 self.assertEqual(b'', stream._buffer) 315 316 stream = asyncio.StreamReader(limit=7, loop=self.loop) 317 def cb(): 318 stream.feed_data(b'chunk1') 319 stream.feed_data(b'chunk2\n') 320 stream.feed_data(b'chunk3\n') 321 stream.feed_eof() 322 self.loop.call_soon(cb) 323 324 self.assertRaises( 325 ValueError, self.loop.run_until_complete, stream.readline()) 326 self.assertEqual(b'chunk3\n', stream._buffer) 327 328 # check strictness of the limit 329 stream = asyncio.StreamReader(limit=7, loop=self.loop) 330 stream.feed_data(b'1234567\n') 331 line = self.loop.run_until_complete(stream.readline()) 332 self.assertEqual(b'1234567\n', line) 333 self.assertEqual(b'', stream._buffer) 334 335 stream.feed_data(b'12345678\n') 336 with self.assertRaises(ValueError) as cm: 337 self.loop.run_until_complete(stream.readline()) 338 self.assertEqual(b'', stream._buffer) 339 340 stream.feed_data(b'12345678') 341 with self.assertRaises(ValueError) as cm: 342 self.loop.run_until_complete(stream.readline()) 343 self.assertEqual(b'', stream._buffer) 344 345 def test_readline_nolimit_nowait(self): 346 # All needed data for the first 'readline' call will be 347 # in the buffer. 348 stream = asyncio.StreamReader(loop=self.loop) 349 stream.feed_data(self.DATA[:6]) 350 stream.feed_data(self.DATA[6:]) 351 352 line = self.loop.run_until_complete(stream.readline()) 353 354 self.assertEqual(b'line1\n', line) 355 self.assertEqual(b'line2\nline3\n', stream._buffer) 356 357 def test_readline_eof(self): 358 stream = asyncio.StreamReader(loop=self.loop) 359 stream.feed_data(b'some data') 360 stream.feed_eof() 361 362 line = self.loop.run_until_complete(stream.readline()) 363 self.assertEqual(b'some data', line) 364 365 def test_readline_empty_eof(self): 366 stream = asyncio.StreamReader(loop=self.loop) 367 stream.feed_eof() 368 369 line = self.loop.run_until_complete(stream.readline()) 370 self.assertEqual(b'', line) 371 372 def test_readline_read_byte_count(self): 373 stream = asyncio.StreamReader(loop=self.loop) 374 stream.feed_data(self.DATA) 375 376 self.loop.run_until_complete(stream.readline()) 377 378 data = self.loop.run_until_complete(stream.read(7)) 379 380 self.assertEqual(b'line2\nl', data) 381 self.assertEqual(b'ine3\n', stream._buffer) 382 383 def test_readline_exception(self): 384 stream = asyncio.StreamReader(loop=self.loop) 385 stream.feed_data(b'line\n') 386 387 data = self.loop.run_until_complete(stream.readline()) 388 self.assertEqual(b'line\n', data) 389 390 stream.set_exception(ValueError()) 391 self.assertRaises( 392 ValueError, self.loop.run_until_complete, stream.readline()) 393 self.assertEqual(b'', stream._buffer) 394 395 def test_readuntil_separator(self): 396 stream = asyncio.StreamReader(loop=self.loop) 397 with self.assertRaisesRegex(ValueError, 'Separator should be'): 398 self.loop.run_until_complete(stream.readuntil(separator=b'')) 399 400 def test_readuntil_multi_chunks(self): 401 stream = asyncio.StreamReader(loop=self.loop) 402 403 stream.feed_data(b'lineAAA') 404 data = self.loop.run_until_complete(stream.readuntil(separator=b'AAA')) 405 self.assertEqual(b'lineAAA', data) 406 self.assertEqual(b'', stream._buffer) 407 408 stream.feed_data(b'lineAAA') 409 data = self.loop.run_until_complete(stream.readuntil(b'AAA')) 410 self.assertEqual(b'lineAAA', data) 411 self.assertEqual(b'', stream._buffer) 412 413 stream.feed_data(b'lineAAAxxx') 414 data = self.loop.run_until_complete(stream.readuntil(b'AAA')) 415 self.assertEqual(b'lineAAA', data) 416 self.assertEqual(b'xxx', stream._buffer) 417 418 def test_readuntil_multi_chunks_1(self): 419 stream = asyncio.StreamReader(loop=self.loop) 420 421 stream.feed_data(b'QWEaa') 422 stream.feed_data(b'XYaa') 423 stream.feed_data(b'a') 424 data = self.loop.run_until_complete(stream.readuntil(b'aaa')) 425 self.assertEqual(b'QWEaaXYaaa', data) 426 self.assertEqual(b'', stream._buffer) 427 428 stream.feed_data(b'QWEaa') 429 stream.feed_data(b'XYa') 430 stream.feed_data(b'aa') 431 data = self.loop.run_until_complete(stream.readuntil(b'aaa')) 432 self.assertEqual(b'QWEaaXYaaa', data) 433 self.assertEqual(b'', stream._buffer) 434 435 stream.feed_data(b'aaa') 436 data = self.loop.run_until_complete(stream.readuntil(b'aaa')) 437 self.assertEqual(b'aaa', data) 438 self.assertEqual(b'', stream._buffer) 439 440 stream.feed_data(b'Xaaa') 441 data = self.loop.run_until_complete(stream.readuntil(b'aaa')) 442 self.assertEqual(b'Xaaa', data) 443 self.assertEqual(b'', stream._buffer) 444 445 stream.feed_data(b'XXX') 446 stream.feed_data(b'a') 447 stream.feed_data(b'a') 448 stream.feed_data(b'a') 449 data = self.loop.run_until_complete(stream.readuntil(b'aaa')) 450 self.assertEqual(b'XXXaaa', data) 451 self.assertEqual(b'', stream._buffer) 452 453 def test_readuntil_eof(self): 454 stream = asyncio.StreamReader(loop=self.loop) 455 stream.feed_data(b'some dataAA') 456 stream.feed_eof() 457 458 with self.assertRaises(asyncio.IncompleteReadError) as cm: 459 self.loop.run_until_complete(stream.readuntil(b'AAA')) 460 self.assertEqual(cm.exception.partial, b'some dataAA') 461 self.assertIsNone(cm.exception.expected) 462 self.assertEqual(b'', stream._buffer) 463 464 def test_readuntil_limit_found_sep(self): 465 stream = asyncio.StreamReader(loop=self.loop, limit=3) 466 stream.feed_data(b'some dataAA') 467 with self.assertRaisesRegex(asyncio.LimitOverrunError, 468 'not found') as cm: 469 self.loop.run_until_complete(stream.readuntil(b'AAA')) 470 471 self.assertEqual(b'some dataAA', stream._buffer) 472 473 stream.feed_data(b'A') 474 with self.assertRaisesRegex(asyncio.LimitOverrunError, 475 'is found') as cm: 476 self.loop.run_until_complete(stream.readuntil(b'AAA')) 477 478 self.assertEqual(b'some dataAAA', stream._buffer) 479 480 def test_readexactly_zero_or_less(self): 481 # Read exact number of bytes (zero or less). 482 stream = asyncio.StreamReader(loop=self.loop) 483 stream.feed_data(self.DATA) 484 485 data = self.loop.run_until_complete(stream.readexactly(0)) 486 self.assertEqual(b'', data) 487 self.assertEqual(self.DATA, stream._buffer) 488 489 with self.assertRaisesRegex(ValueError, 'less than zero'): 490 self.loop.run_until_complete(stream.readexactly(-1)) 491 self.assertEqual(self.DATA, stream._buffer) 492 493 def test_readexactly(self): 494 # Read exact number of bytes. 495 stream = asyncio.StreamReader(loop=self.loop) 496 497 n = 2 * len(self.DATA) 498 read_task = self.loop.create_task(stream.readexactly(n)) 499 500 def cb(): 501 stream.feed_data(self.DATA) 502 stream.feed_data(self.DATA) 503 stream.feed_data(self.DATA) 504 self.loop.call_soon(cb) 505 506 data = self.loop.run_until_complete(read_task) 507 self.assertEqual(self.DATA + self.DATA, data) 508 self.assertEqual(self.DATA, stream._buffer) 509 510 def test_readexactly_limit(self): 511 stream = asyncio.StreamReader(limit=3, loop=self.loop) 512 stream.feed_data(b'chunk') 513 data = self.loop.run_until_complete(stream.readexactly(5)) 514 self.assertEqual(b'chunk', data) 515 self.assertEqual(b'', stream._buffer) 516 517 def test_readexactly_eof(self): 518 # Read exact number of bytes (eof). 519 stream = asyncio.StreamReader(loop=self.loop) 520 n = 2 * len(self.DATA) 521 read_task = self.loop.create_task(stream.readexactly(n)) 522 523 def cb(): 524 stream.feed_data(self.DATA) 525 stream.feed_eof() 526 self.loop.call_soon(cb) 527 528 with self.assertRaises(asyncio.IncompleteReadError) as cm: 529 self.loop.run_until_complete(read_task) 530 self.assertEqual(cm.exception.partial, self.DATA) 531 self.assertEqual(cm.exception.expected, n) 532 self.assertEqual(str(cm.exception), 533 '18 bytes read on a total of 36 expected bytes') 534 self.assertEqual(b'', stream._buffer) 535 536 def test_readexactly_exception(self): 537 stream = asyncio.StreamReader(loop=self.loop) 538 stream.feed_data(b'line\n') 539 540 data = self.loop.run_until_complete(stream.readexactly(2)) 541 self.assertEqual(b'li', data) 542 543 stream.set_exception(ValueError()) 544 self.assertRaises( 545 ValueError, self.loop.run_until_complete, stream.readexactly(2)) 546 547 def test_exception(self): 548 stream = asyncio.StreamReader(loop=self.loop) 549 self.assertIsNone(stream.exception()) 550 551 exc = ValueError() 552 stream.set_exception(exc) 553 self.assertIs(stream.exception(), exc) 554 555 def test_exception_waiter(self): 556 stream = asyncio.StreamReader(loop=self.loop) 557 558 async def set_err(): 559 stream.set_exception(ValueError()) 560 561 t1 = self.loop.create_task(stream.readline()) 562 t2 = self.loop.create_task(set_err()) 563 564 self.loop.run_until_complete(asyncio.wait([t1, t2])) 565 566 self.assertRaises(ValueError, t1.result) 567 568 def test_exception_cancel(self): 569 stream = asyncio.StreamReader(loop=self.loop) 570 571 t = self.loop.create_task(stream.readline()) 572 test_utils.run_briefly(self.loop) 573 t.cancel() 574 test_utils.run_briefly(self.loop) 575 # The following line fails if set_exception() isn't careful. 576 stream.set_exception(RuntimeError('message')) 577 test_utils.run_briefly(self.loop) 578 self.assertIs(stream._waiter, None) 579 580 def test_start_server(self): 581 582 class MyServer: 583 584 def __init__(self, loop): 585 self.server = None 586 self.loop = loop 587 588 async def handle_client(self, client_reader, client_writer): 589 data = await client_reader.readline() 590 client_writer.write(data) 591 await client_writer.drain() 592 client_writer.close() 593 await client_writer.wait_closed() 594 595 def start(self): 596 sock = socket.create_server(('127.0.0.1', 0)) 597 self.server = self.loop.run_until_complete( 598 asyncio.start_server(self.handle_client, 599 sock=sock, 600 loop=self.loop)) 601 return sock.getsockname() 602 603 def handle_client_callback(self, client_reader, client_writer): 604 self.loop.create_task(self.handle_client(client_reader, 605 client_writer)) 606 607 def start_callback(self): 608 sock = socket.create_server(('127.0.0.1', 0)) 609 addr = sock.getsockname() 610 sock.close() 611 self.server = self.loop.run_until_complete( 612 asyncio.start_server(self.handle_client_callback, 613 host=addr[0], port=addr[1], 614 loop=self.loop)) 615 return addr 616 617 def stop(self): 618 if self.server is not None: 619 self.server.close() 620 self.loop.run_until_complete(self.server.wait_closed()) 621 self.server = None 622 623 async def client(addr): 624 with self.assertWarns(DeprecationWarning): 625 reader, writer = await asyncio.open_connection( 626 *addr, loop=self.loop) 627 # send a line 628 writer.write(b"hello world!\n") 629 # read it back 630 msgback = await reader.readline() 631 writer.close() 632 await writer.wait_closed() 633 return msgback 634 635 messages = [] 636 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) 637 638 # test the server variant with a coroutine as client handler 639 server = MyServer(self.loop) 640 with self.assertWarns(DeprecationWarning): 641 addr = server.start() 642 msg = self.loop.run_until_complete(self.loop.create_task(client(addr))) 643 server.stop() 644 self.assertEqual(msg, b"hello world!\n") 645 646 # test the server variant with a callback as client handler 647 server = MyServer(self.loop) 648 with self.assertWarns(DeprecationWarning): 649 addr = server.start_callback() 650 msg = self.loop.run_until_complete(self.loop.create_task(client(addr))) 651 server.stop() 652 self.assertEqual(msg, b"hello world!\n") 653 654 self.assertEqual(messages, []) 655 656 @support.skip_unless_bind_unix_socket 657 def test_start_unix_server(self): 658 659 class MyServer: 660 661 def __init__(self, loop, path): 662 self.server = None 663 self.loop = loop 664 self.path = path 665 666 async def handle_client(self, client_reader, client_writer): 667 data = await client_reader.readline() 668 client_writer.write(data) 669 await client_writer.drain() 670 client_writer.close() 671 await client_writer.wait_closed() 672 673 def start(self): 674 self.server = self.loop.run_until_complete( 675 asyncio.start_unix_server(self.handle_client, 676 path=self.path, 677 loop=self.loop)) 678 679 def handle_client_callback(self, client_reader, client_writer): 680 self.loop.create_task(self.handle_client(client_reader, 681 client_writer)) 682 683 def start_callback(self): 684 start = asyncio.start_unix_server(self.handle_client_callback, 685 path=self.path, 686 loop=self.loop) 687 self.server = self.loop.run_until_complete(start) 688 689 def stop(self): 690 if self.server is not None: 691 self.server.close() 692 self.loop.run_until_complete(self.server.wait_closed()) 693 self.server = None 694 695 async def client(path): 696 with self.assertWarns(DeprecationWarning): 697 reader, writer = await asyncio.open_unix_connection( 698 path, loop=self.loop) 699 # send a line 700 writer.write(b"hello world!\n") 701 # read it back 702 msgback = await reader.readline() 703 writer.close() 704 await writer.wait_closed() 705 return msgback 706 707 messages = [] 708 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) 709 710 # test the server variant with a coroutine as client handler 711 with test_utils.unix_socket_path() as path: 712 server = MyServer(self.loop, path) 713 with self.assertWarns(DeprecationWarning): 714 server.start() 715 msg = self.loop.run_until_complete( 716 self.loop.create_task(client(path))) 717 server.stop() 718 self.assertEqual(msg, b"hello world!\n") 719 720 # test the server variant with a callback as client handler 721 with test_utils.unix_socket_path() as path: 722 server = MyServer(self.loop, path) 723 with self.assertWarns(DeprecationWarning): 724 server.start_callback() 725 msg = self.loop.run_until_complete( 726 self.loop.create_task(client(path))) 727 server.stop() 728 self.assertEqual(msg, b"hello world!\n") 729 730 self.assertEqual(messages, []) 731 732 @unittest.skipIf(sys.platform == 'win32', "Don't have pipes") 733 def test_read_all_from_pipe_reader(self): 734 # See asyncio issue 168. This test is derived from the example 735 # subprocess_attach_read_pipe.py, but we configure the 736 # StreamReader's limit so that twice it is less than the size 737 # of the data writter. Also we must explicitly attach a child 738 # watcher to the event loop. 739 740 code = """\ 741import os, sys 742fd = int(sys.argv[1]) 743os.write(fd, b'data') 744os.close(fd) 745""" 746 rfd, wfd = os.pipe() 747 args = [sys.executable, '-c', code, str(wfd)] 748 749 pipe = open(rfd, 'rb', 0) 750 reader = asyncio.StreamReader(loop=self.loop, limit=1) 751 protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop) 752 transport, _ = self.loop.run_until_complete( 753 self.loop.connect_read_pipe(lambda: protocol, pipe)) 754 755 watcher = asyncio.SafeChildWatcher() 756 watcher.attach_loop(self.loop) 757 try: 758 asyncio.set_child_watcher(watcher) 759 create = asyncio.create_subprocess_exec( 760 *args, 761 pass_fds={wfd}, 762 ) 763 proc = self.loop.run_until_complete(create) 764 self.loop.run_until_complete(proc.wait()) 765 finally: 766 asyncio.set_child_watcher(None) 767 768 os.close(wfd) 769 data = self.loop.run_until_complete(reader.read(-1)) 770 self.assertEqual(data, b'data') 771 772 def test_streamreader_constructor(self): 773 self.addCleanup(asyncio.set_event_loop, None) 774 asyncio.set_event_loop(self.loop) 775 776 # asyncio issue #184: Ensure that StreamReaderProtocol constructor 777 # retrieves the current loop if the loop parameter is not set 778 reader = asyncio.StreamReader() 779 self.assertIs(reader._loop, self.loop) 780 781 def test_streamreaderprotocol_constructor(self): 782 self.addCleanup(asyncio.set_event_loop, None) 783 asyncio.set_event_loop(self.loop) 784 785 # asyncio issue #184: Ensure that StreamReaderProtocol constructor 786 # retrieves the current loop if the loop parameter is not set 787 reader = mock.Mock() 788 protocol = asyncio.StreamReaderProtocol(reader) 789 self.assertIs(protocol._loop, self.loop) 790 791 def test_drain_raises(self): 792 # See http://bugs.python.org/issue25441 793 794 # This test should not use asyncio for the mock server; the 795 # whole point of the test is to test for a bug in drain() 796 # where it never gives up the event loop but the socket is 797 # closed on the server side. 798 799 messages = [] 800 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) 801 q = queue.Queue() 802 803 def server(): 804 # Runs in a separate thread. 805 with socket.create_server(('localhost', 0)) as sock: 806 addr = sock.getsockname() 807 q.put(addr) 808 clt, _ = sock.accept() 809 clt.close() 810 811 async def client(host, port): 812 with self.assertWarns(DeprecationWarning): 813 reader, writer = await asyncio.open_connection( 814 host, port, loop=self.loop) 815 816 while True: 817 writer.write(b"foo\n") 818 await writer.drain() 819 820 # Start the server thread and wait for it to be listening. 821 thread = threading.Thread(target=server) 822 thread.setDaemon(True) 823 thread.start() 824 addr = q.get() 825 826 # Should not be stuck in an infinite loop. 827 with self.assertRaises((ConnectionResetError, ConnectionAbortedError, 828 BrokenPipeError)): 829 self.loop.run_until_complete(client(*addr)) 830 831 # Clean up the thread. (Only on success; on failure, it may 832 # be stuck in accept().) 833 thread.join() 834 self.assertEqual([], messages) 835 836 def test___repr__(self): 837 stream = asyncio.StreamReader(loop=self.loop) 838 self.assertEqual("<StreamReader>", repr(stream)) 839 840 def test___repr__nondefault_limit(self): 841 stream = asyncio.StreamReader(loop=self.loop, limit=123) 842 self.assertEqual("<StreamReader limit=123>", repr(stream)) 843 844 def test___repr__eof(self): 845 stream = asyncio.StreamReader(loop=self.loop) 846 stream.feed_eof() 847 self.assertEqual("<StreamReader eof>", repr(stream)) 848 849 def test___repr__data(self): 850 stream = asyncio.StreamReader(loop=self.loop) 851 stream.feed_data(b'data') 852 self.assertEqual("<StreamReader 4 bytes>", repr(stream)) 853 854 def test___repr__exception(self): 855 stream = asyncio.StreamReader(loop=self.loop) 856 exc = RuntimeError() 857 stream.set_exception(exc) 858 self.assertEqual("<StreamReader exception=RuntimeError()>", 859 repr(stream)) 860 861 def test___repr__waiter(self): 862 stream = asyncio.StreamReader(loop=self.loop) 863 stream._waiter = asyncio.Future(loop=self.loop) 864 self.assertRegex( 865 repr(stream), 866 r"<StreamReader waiter=<Future pending[\S ]*>>") 867 stream._waiter.set_result(None) 868 self.loop.run_until_complete(stream._waiter) 869 stream._waiter = None 870 self.assertEqual("<StreamReader>", repr(stream)) 871 872 def test___repr__transport(self): 873 stream = asyncio.StreamReader(loop=self.loop) 874 stream._transport = mock.Mock() 875 stream._transport.__repr__ = mock.Mock() 876 stream._transport.__repr__.return_value = "<Transport>" 877 self.assertEqual("<StreamReader transport=<Transport>>", repr(stream)) 878 879 def test_IncompleteReadError_pickleable(self): 880 e = asyncio.IncompleteReadError(b'abc', 10) 881 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 882 with self.subTest(pickle_protocol=proto): 883 e2 = pickle.loads(pickle.dumps(e, protocol=proto)) 884 self.assertEqual(str(e), str(e2)) 885 self.assertEqual(e.partial, e2.partial) 886 self.assertEqual(e.expected, e2.expected) 887 888 def test_LimitOverrunError_pickleable(self): 889 e = asyncio.LimitOverrunError('message', 10) 890 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 891 with self.subTest(pickle_protocol=proto): 892 e2 = pickle.loads(pickle.dumps(e, protocol=proto)) 893 self.assertEqual(str(e), str(e2)) 894 self.assertEqual(e.consumed, e2.consumed) 895 896 def test_wait_closed_on_close(self): 897 with test_utils.run_test_server() as httpd: 898 with self.assertWarns(DeprecationWarning): 899 rd, wr = self.loop.run_until_complete( 900 asyncio.open_connection(*httpd.address, loop=self.loop)) 901 902 wr.write(b'GET / HTTP/1.0\r\n\r\n') 903 f = rd.readline() 904 data = self.loop.run_until_complete(f) 905 self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') 906 f = rd.read() 907 data = self.loop.run_until_complete(f) 908 self.assertTrue(data.endswith(b'\r\n\r\nTest message')) 909 self.assertFalse(wr.is_closing()) 910 wr.close() 911 self.assertTrue(wr.is_closing()) 912 self.loop.run_until_complete(wr.wait_closed()) 913 914 def test_wait_closed_on_close_with_unread_data(self): 915 with test_utils.run_test_server() as httpd: 916 with self.assertWarns(DeprecationWarning): 917 rd, wr = self.loop.run_until_complete( 918 asyncio.open_connection(*httpd.address, loop=self.loop)) 919 920 wr.write(b'GET / HTTP/1.0\r\n\r\n') 921 f = rd.readline() 922 data = self.loop.run_until_complete(f) 923 self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') 924 wr.close() 925 self.loop.run_until_complete(wr.wait_closed()) 926 927 def test_async_writer_api(self): 928 async def inner(httpd): 929 rd, wr = await asyncio.open_connection(*httpd.address) 930 931 wr.write(b'GET / HTTP/1.0\r\n\r\n') 932 data = await rd.readline() 933 self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') 934 data = await rd.read() 935 self.assertTrue(data.endswith(b'\r\n\r\nTest message')) 936 wr.close() 937 await wr.wait_closed() 938 939 messages = [] 940 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) 941 942 with test_utils.run_test_server() as httpd: 943 self.loop.run_until_complete(inner(httpd)) 944 945 self.assertEqual(messages, []) 946 947 def test_async_writer_api_exception_after_close(self): 948 async def inner(httpd): 949 rd, wr = await asyncio.open_connection(*httpd.address) 950 951 wr.write(b'GET / HTTP/1.0\r\n\r\n') 952 data = await rd.readline() 953 self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') 954 data = await rd.read() 955 self.assertTrue(data.endswith(b'\r\n\r\nTest message')) 956 wr.close() 957 with self.assertRaises(ConnectionResetError): 958 wr.write(b'data') 959 await wr.drain() 960 961 messages = [] 962 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) 963 964 with test_utils.run_test_server() as httpd: 965 self.loop.run_until_complete(inner(httpd)) 966 967 self.assertEqual(messages, []) 968 969 def test_eof_feed_when_closing_writer(self): 970 # See http://bugs.python.org/issue35065 971 messages = [] 972 self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) 973 974 with test_utils.run_test_server() as httpd: 975 with self.assertWarns(DeprecationWarning): 976 rd, wr = self.loop.run_until_complete( 977 asyncio.open_connection(*httpd.address, 978 loop=self.loop)) 979 980 wr.close() 981 f = wr.wait_closed() 982 self.loop.run_until_complete(f) 983 assert rd.at_eof() 984 f = rd.read() 985 data = self.loop.run_until_complete(f) 986 assert data == b'' 987 988 self.assertEqual(messages, []) 989 990 991if __name__ == '__main__': 992 unittest.main() 993