1"""Tests for proactor_events.py""" 2 3import io 4import socket 5import unittest 6import sys 7from unittest import mock 8 9import asyncio 10from asyncio.proactor_events import BaseProactorEventLoop 11from asyncio.proactor_events import _ProactorSocketTransport 12from asyncio.proactor_events import _ProactorWritePipeTransport 13from asyncio.proactor_events import _ProactorDuplexPipeTransport 14from asyncio.proactor_events import _ProactorDatagramTransport 15from test import support 16from test.test_asyncio import utils as test_utils 17 18 19def tearDownModule(): 20 asyncio.set_event_loop_policy(None) 21 22 23def close_transport(transport): 24 # Don't call transport.close() because the event loop and the IOCP proactor 25 # are mocked 26 if transport._sock is None: 27 return 28 transport._sock.close() 29 transport._sock = None 30 31 32class ProactorSocketTransportTests(test_utils.TestCase): 33 34 def setUp(self): 35 super().setUp() 36 self.loop = self.new_test_loop() 37 self.addCleanup(self.loop.close) 38 self.proactor = mock.Mock() 39 self.loop._proactor = self.proactor 40 self.protocol = test_utils.make_test_protocol(asyncio.Protocol) 41 self.sock = mock.Mock(socket.socket) 42 43 def socket_transport(self, waiter=None): 44 transport = _ProactorSocketTransport(self.loop, self.sock, 45 self.protocol, waiter=waiter) 46 self.addCleanup(close_transport, transport) 47 return transport 48 49 def test_ctor(self): 50 fut = self.loop.create_future() 51 tr = self.socket_transport(waiter=fut) 52 test_utils.run_briefly(self.loop) 53 self.assertIsNone(fut.result()) 54 self.protocol.connection_made(tr) 55 self.proactor.recv.assert_called_with(self.sock, 32768) 56 57 def test_loop_reading(self): 58 tr = self.socket_transport() 59 tr._loop_reading() 60 self.loop._proactor.recv.assert_called_with(self.sock, 32768) 61 self.assertFalse(self.protocol.data_received.called) 62 self.assertFalse(self.protocol.eof_received.called) 63 64 def test_loop_reading_data(self): 65 res = self.loop.create_future() 66 res.set_result(b'data') 67 68 tr = self.socket_transport() 69 tr._read_fut = res 70 tr._loop_reading(res) 71 self.loop._proactor.recv.assert_called_with(self.sock, 32768) 72 self.protocol.data_received.assert_called_with(b'data') 73 74 def test_loop_reading_no_data(self): 75 res = self.loop.create_future() 76 res.set_result(b'') 77 78 tr = self.socket_transport() 79 self.assertRaises(AssertionError, tr._loop_reading, res) 80 81 tr.close = mock.Mock() 82 tr._read_fut = res 83 tr._loop_reading(res) 84 self.assertFalse(self.loop._proactor.recv.called) 85 self.assertTrue(self.protocol.eof_received.called) 86 self.assertTrue(tr.close.called) 87 88 def test_loop_reading_aborted(self): 89 err = self.loop._proactor.recv.side_effect = ConnectionAbortedError() 90 91 tr = self.socket_transport() 92 tr._fatal_error = mock.Mock() 93 tr._loop_reading() 94 tr._fatal_error.assert_called_with( 95 err, 96 'Fatal read error on pipe transport') 97 98 def test_loop_reading_aborted_closing(self): 99 self.loop._proactor.recv.side_effect = ConnectionAbortedError() 100 101 tr = self.socket_transport() 102 tr._closing = True 103 tr._fatal_error = mock.Mock() 104 tr._loop_reading() 105 self.assertFalse(tr._fatal_error.called) 106 107 def test_loop_reading_aborted_is_fatal(self): 108 self.loop._proactor.recv.side_effect = ConnectionAbortedError() 109 tr = self.socket_transport() 110 tr._closing = False 111 tr._fatal_error = mock.Mock() 112 tr._loop_reading() 113 self.assertTrue(tr._fatal_error.called) 114 115 def test_loop_reading_conn_reset_lost(self): 116 err = self.loop._proactor.recv.side_effect = ConnectionResetError() 117 118 tr = self.socket_transport() 119 tr._closing = False 120 tr._fatal_error = mock.Mock() 121 tr._force_close = mock.Mock() 122 tr._loop_reading() 123 self.assertFalse(tr._fatal_error.called) 124 tr._force_close.assert_called_with(err) 125 126 def test_loop_reading_exception(self): 127 err = self.loop._proactor.recv.side_effect = (OSError()) 128 129 tr = self.socket_transport() 130 tr._fatal_error = mock.Mock() 131 tr._loop_reading() 132 tr._fatal_error.assert_called_with( 133 err, 134 'Fatal read error on pipe transport') 135 136 def test_write(self): 137 tr = self.socket_transport() 138 tr._loop_writing = mock.Mock() 139 tr.write(b'data') 140 self.assertEqual(tr._buffer, None) 141 tr._loop_writing.assert_called_with(data=b'data') 142 143 def test_write_no_data(self): 144 tr = self.socket_transport() 145 tr.write(b'') 146 self.assertFalse(tr._buffer) 147 148 def test_write_more(self): 149 tr = self.socket_transport() 150 tr._write_fut = mock.Mock() 151 tr._loop_writing = mock.Mock() 152 tr.write(b'data') 153 self.assertEqual(tr._buffer, b'data') 154 self.assertFalse(tr._loop_writing.called) 155 156 def test_loop_writing(self): 157 tr = self.socket_transport() 158 tr._buffer = bytearray(b'data') 159 tr._loop_writing() 160 self.loop._proactor.send.assert_called_with(self.sock, b'data') 161 self.loop._proactor.send.return_value.add_done_callback.\ 162 assert_called_with(tr._loop_writing) 163 164 @mock.patch('asyncio.proactor_events.logger') 165 def test_loop_writing_err(self, m_log): 166 err = self.loop._proactor.send.side_effect = OSError() 167 tr = self.socket_transport() 168 tr._fatal_error = mock.Mock() 169 tr._buffer = [b'da', b'ta'] 170 tr._loop_writing() 171 tr._fatal_error.assert_called_with( 172 err, 173 'Fatal write error on pipe transport') 174 tr._conn_lost = 1 175 176 tr.write(b'data') 177 tr.write(b'data') 178 tr.write(b'data') 179 tr.write(b'data') 180 tr.write(b'data') 181 self.assertEqual(tr._buffer, None) 182 m_log.warning.assert_called_with('socket.send() raised exception.') 183 184 def test_loop_writing_stop(self): 185 fut = self.loop.create_future() 186 fut.set_result(b'data') 187 188 tr = self.socket_transport() 189 tr._write_fut = fut 190 tr._loop_writing(fut) 191 self.assertIsNone(tr._write_fut) 192 193 def test_loop_writing_closing(self): 194 fut = self.loop.create_future() 195 fut.set_result(1) 196 197 tr = self.socket_transport() 198 tr._write_fut = fut 199 tr.close() 200 tr._loop_writing(fut) 201 self.assertIsNone(tr._write_fut) 202 test_utils.run_briefly(self.loop) 203 self.protocol.connection_lost.assert_called_with(None) 204 205 def test_abort(self): 206 tr = self.socket_transport() 207 tr._force_close = mock.Mock() 208 tr.abort() 209 tr._force_close.assert_called_with(None) 210 211 def test_close(self): 212 tr = self.socket_transport() 213 tr.close() 214 test_utils.run_briefly(self.loop) 215 self.protocol.connection_lost.assert_called_with(None) 216 self.assertTrue(tr.is_closing()) 217 self.assertEqual(tr._conn_lost, 1) 218 219 self.protocol.connection_lost.reset_mock() 220 tr.close() 221 test_utils.run_briefly(self.loop) 222 self.assertFalse(self.protocol.connection_lost.called) 223 224 def test_close_write_fut(self): 225 tr = self.socket_transport() 226 tr._write_fut = mock.Mock() 227 tr.close() 228 test_utils.run_briefly(self.loop) 229 self.assertFalse(self.protocol.connection_lost.called) 230 231 def test_close_buffer(self): 232 tr = self.socket_transport() 233 tr._buffer = [b'data'] 234 tr.close() 235 test_utils.run_briefly(self.loop) 236 self.assertFalse(self.protocol.connection_lost.called) 237 238 @mock.patch('asyncio.base_events.logger') 239 def test_fatal_error(self, m_logging): 240 tr = self.socket_transport() 241 tr._force_close = mock.Mock() 242 tr._fatal_error(None) 243 self.assertTrue(tr._force_close.called) 244 self.assertTrue(m_logging.error.called) 245 246 def test_force_close(self): 247 tr = self.socket_transport() 248 tr._buffer = [b'data'] 249 read_fut = tr._read_fut = mock.Mock() 250 write_fut = tr._write_fut = mock.Mock() 251 tr._force_close(None) 252 253 read_fut.cancel.assert_called_with() 254 write_fut.cancel.assert_called_with() 255 test_utils.run_briefly(self.loop) 256 self.protocol.connection_lost.assert_called_with(None) 257 self.assertEqual(None, tr._buffer) 258 self.assertEqual(tr._conn_lost, 1) 259 260 def test_loop_writing_force_close(self): 261 exc_handler = mock.Mock() 262 self.loop.set_exception_handler(exc_handler) 263 fut = self.loop.create_future() 264 fut.set_result(1) 265 self.proactor.send.return_value = fut 266 267 tr = self.socket_transport() 268 tr.write(b'data') 269 tr._force_close(None) 270 test_utils.run_briefly(self.loop) 271 exc_handler.assert_not_called() 272 273 def test_force_close_idempotent(self): 274 tr = self.socket_transport() 275 tr._closing = True 276 tr._force_close(None) 277 test_utils.run_briefly(self.loop) 278 self.assertFalse(self.protocol.connection_lost.called) 279 280 def test_fatal_error_2(self): 281 tr = self.socket_transport() 282 tr._buffer = [b'data'] 283 tr._force_close(None) 284 285 test_utils.run_briefly(self.loop) 286 self.protocol.connection_lost.assert_called_with(None) 287 self.assertEqual(None, tr._buffer) 288 289 def test_call_connection_lost(self): 290 tr = self.socket_transport() 291 tr._call_connection_lost(None) 292 self.assertTrue(self.protocol.connection_lost.called) 293 self.assertTrue(self.sock.close.called) 294 295 def test_write_eof(self): 296 tr = self.socket_transport() 297 self.assertTrue(tr.can_write_eof()) 298 tr.write_eof() 299 self.sock.shutdown.assert_called_with(socket.SHUT_WR) 300 tr.write_eof() 301 self.assertEqual(self.sock.shutdown.call_count, 1) 302 tr.close() 303 304 def test_write_eof_buffer(self): 305 tr = self.socket_transport() 306 f = self.loop.create_future() 307 tr._loop._proactor.send.return_value = f 308 tr.write(b'data') 309 tr.write_eof() 310 self.assertTrue(tr._eof_written) 311 self.assertFalse(self.sock.shutdown.called) 312 tr._loop._proactor.send.assert_called_with(self.sock, b'data') 313 f.set_result(4) 314 self.loop._run_once() 315 self.sock.shutdown.assert_called_with(socket.SHUT_WR) 316 tr.close() 317 318 def test_write_eof_write_pipe(self): 319 tr = _ProactorWritePipeTransport( 320 self.loop, self.sock, self.protocol) 321 self.assertTrue(tr.can_write_eof()) 322 tr.write_eof() 323 self.assertTrue(tr.is_closing()) 324 self.loop._run_once() 325 self.assertTrue(self.sock.close.called) 326 tr.close() 327 328 def test_write_eof_buffer_write_pipe(self): 329 tr = _ProactorWritePipeTransport(self.loop, self.sock, self.protocol) 330 f = self.loop.create_future() 331 tr._loop._proactor.send.return_value = f 332 tr.write(b'data') 333 tr.write_eof() 334 self.assertTrue(tr.is_closing()) 335 self.assertFalse(self.sock.shutdown.called) 336 tr._loop._proactor.send.assert_called_with(self.sock, b'data') 337 f.set_result(4) 338 self.loop._run_once() 339 self.loop._run_once() 340 self.assertTrue(self.sock.close.called) 341 tr.close() 342 343 def test_write_eof_duplex_pipe(self): 344 tr = _ProactorDuplexPipeTransport( 345 self.loop, self.sock, self.protocol) 346 self.assertFalse(tr.can_write_eof()) 347 with self.assertRaises(NotImplementedError): 348 tr.write_eof() 349 close_transport(tr) 350 351 def test_pause_resume_reading(self): 352 tr = self.socket_transport() 353 futures = [] 354 for msg in [b'data1', b'data2', b'data3', b'data4', b'data5', b'']: 355 f = self.loop.create_future() 356 f.set_result(msg) 357 futures.append(f) 358 359 self.loop._proactor.recv.side_effect = futures 360 self.loop._run_once() 361 self.assertFalse(tr._paused) 362 self.assertTrue(tr.is_reading()) 363 self.loop._run_once() 364 self.protocol.data_received.assert_called_with(b'data1') 365 self.loop._run_once() 366 self.protocol.data_received.assert_called_with(b'data2') 367 368 tr.pause_reading() 369 tr.pause_reading() 370 self.assertTrue(tr._paused) 371 self.assertFalse(tr.is_reading()) 372 for i in range(10): 373 self.loop._run_once() 374 self.protocol.data_received.assert_called_with(b'data2') 375 376 tr.resume_reading() 377 tr.resume_reading() 378 self.assertFalse(tr._paused) 379 self.assertTrue(tr.is_reading()) 380 self.loop._run_once() 381 self.protocol.data_received.assert_called_with(b'data3') 382 self.loop._run_once() 383 self.protocol.data_received.assert_called_with(b'data4') 384 385 tr.pause_reading() 386 tr.resume_reading() 387 self.loop.call_exception_handler = mock.Mock() 388 self.loop._run_once() 389 self.loop.call_exception_handler.assert_not_called() 390 self.protocol.data_received.assert_called_with(b'data5') 391 tr.close() 392 393 self.assertFalse(tr.is_reading()) 394 395 396 def pause_writing_transport(self, high): 397 tr = self.socket_transport() 398 tr.set_write_buffer_limits(high=high) 399 400 self.assertEqual(tr.get_write_buffer_size(), 0) 401 self.assertFalse(self.protocol.pause_writing.called) 402 self.assertFalse(self.protocol.resume_writing.called) 403 return tr 404 405 def test_pause_resume_writing(self): 406 tr = self.pause_writing_transport(high=4) 407 408 # write a large chunk, must pause writing 409 fut = self.loop.create_future() 410 self.loop._proactor.send.return_value = fut 411 tr.write(b'large data') 412 self.loop._run_once() 413 self.assertTrue(self.protocol.pause_writing.called) 414 415 # flush the buffer 416 fut.set_result(None) 417 self.loop._run_once() 418 self.assertEqual(tr.get_write_buffer_size(), 0) 419 self.assertTrue(self.protocol.resume_writing.called) 420 421 def test_pause_writing_2write(self): 422 tr = self.pause_writing_transport(high=4) 423 424 # first short write, the buffer is not full (3 <= 4) 425 fut1 = self.loop.create_future() 426 self.loop._proactor.send.return_value = fut1 427 tr.write(b'123') 428 self.loop._run_once() 429 self.assertEqual(tr.get_write_buffer_size(), 3) 430 self.assertFalse(self.protocol.pause_writing.called) 431 432 # fill the buffer, must pause writing (6 > 4) 433 tr.write(b'abc') 434 self.loop._run_once() 435 self.assertEqual(tr.get_write_buffer_size(), 6) 436 self.assertTrue(self.protocol.pause_writing.called) 437 438 def test_pause_writing_3write(self): 439 tr = self.pause_writing_transport(high=4) 440 441 # first short write, the buffer is not full (1 <= 4) 442 fut = self.loop.create_future() 443 self.loop._proactor.send.return_value = fut 444 tr.write(b'1') 445 self.loop._run_once() 446 self.assertEqual(tr.get_write_buffer_size(), 1) 447 self.assertFalse(self.protocol.pause_writing.called) 448 449 # second short write, the buffer is not full (3 <= 4) 450 tr.write(b'23') 451 self.loop._run_once() 452 self.assertEqual(tr.get_write_buffer_size(), 3) 453 self.assertFalse(self.protocol.pause_writing.called) 454 455 # fill the buffer, must pause writing (6 > 4) 456 tr.write(b'abc') 457 self.loop._run_once() 458 self.assertEqual(tr.get_write_buffer_size(), 6) 459 self.assertTrue(self.protocol.pause_writing.called) 460 461 def test_dont_pause_writing(self): 462 tr = self.pause_writing_transport(high=4) 463 464 # write a large chunk which completes immediately, 465 # it should not pause writing 466 fut = self.loop.create_future() 467 fut.set_result(None) 468 self.loop._proactor.send.return_value = fut 469 tr.write(b'very large data') 470 self.loop._run_once() 471 self.assertEqual(tr.get_write_buffer_size(), 0) 472 self.assertFalse(self.protocol.pause_writing.called) 473 474 475class ProactorDatagramTransportTests(test_utils.TestCase): 476 477 def setUp(self): 478 super().setUp() 479 self.loop = self.new_test_loop() 480 self.proactor = mock.Mock() 481 self.loop._proactor = self.proactor 482 self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol) 483 self.sock = mock.Mock(spec_set=socket.socket) 484 self.sock.fileno.return_value = 7 485 486 def datagram_transport(self, address=None): 487 self.sock.getpeername.side_effect = None if address else OSError 488 transport = _ProactorDatagramTransport(self.loop, self.sock, 489 self.protocol, 490 address=address) 491 self.addCleanup(close_transport, transport) 492 return transport 493 494 def test_sendto(self): 495 data = b'data' 496 transport = self.datagram_transport() 497 transport.sendto(data, ('0.0.0.0', 1234)) 498 self.assertTrue(self.proactor.sendto.called) 499 self.proactor.sendto.assert_called_with( 500 self.sock, data, addr=('0.0.0.0', 1234)) 501 502 def test_sendto_bytearray(self): 503 data = bytearray(b'data') 504 transport = self.datagram_transport() 505 transport.sendto(data, ('0.0.0.0', 1234)) 506 self.assertTrue(self.proactor.sendto.called) 507 self.proactor.sendto.assert_called_with( 508 self.sock, b'data', addr=('0.0.0.0', 1234)) 509 510 def test_sendto_memoryview(self): 511 data = memoryview(b'data') 512 transport = self.datagram_transport() 513 transport.sendto(data, ('0.0.0.0', 1234)) 514 self.assertTrue(self.proactor.sendto.called) 515 self.proactor.sendto.assert_called_with( 516 self.sock, b'data', addr=('0.0.0.0', 1234)) 517 518 def test_sendto_no_data(self): 519 transport = self.datagram_transport() 520 transport._buffer.append((b'data', ('0.0.0.0', 12345))) 521 transport.sendto(b'', ()) 522 self.assertFalse(self.sock.sendto.called) 523 self.assertEqual( 524 [(b'data', ('0.0.0.0', 12345))], list(transport._buffer)) 525 526 def test_sendto_buffer(self): 527 transport = self.datagram_transport() 528 transport._buffer.append((b'data1', ('0.0.0.0', 12345))) 529 transport._write_fut = object() 530 transport.sendto(b'data2', ('0.0.0.0', 12345)) 531 self.assertFalse(self.proactor.sendto.called) 532 self.assertEqual( 533 [(b'data1', ('0.0.0.0', 12345)), 534 (b'data2', ('0.0.0.0', 12345))], 535 list(transport._buffer)) 536 537 def test_sendto_buffer_bytearray(self): 538 data2 = bytearray(b'data2') 539 transport = self.datagram_transport() 540 transport._buffer.append((b'data1', ('0.0.0.0', 12345))) 541 transport._write_fut = object() 542 transport.sendto(data2, ('0.0.0.0', 12345)) 543 self.assertFalse(self.proactor.sendto.called) 544 self.assertEqual( 545 [(b'data1', ('0.0.0.0', 12345)), 546 (b'data2', ('0.0.0.0', 12345))], 547 list(transport._buffer)) 548 self.assertIsInstance(transport._buffer[1][0], bytes) 549 550 def test_sendto_buffer_memoryview(self): 551 data2 = memoryview(b'data2') 552 transport = self.datagram_transport() 553 transport._buffer.append((b'data1', ('0.0.0.0', 12345))) 554 transport._write_fut = object() 555 transport.sendto(data2, ('0.0.0.0', 12345)) 556 self.assertFalse(self.proactor.sendto.called) 557 self.assertEqual( 558 [(b'data1', ('0.0.0.0', 12345)), 559 (b'data2', ('0.0.0.0', 12345))], 560 list(transport._buffer)) 561 self.assertIsInstance(transport._buffer[1][0], bytes) 562 563 @mock.patch('asyncio.proactor_events.logger') 564 def test_sendto_exception(self, m_log): 565 data = b'data' 566 err = self.proactor.sendto.side_effect = RuntimeError() 567 568 transport = self.datagram_transport() 569 transport._fatal_error = mock.Mock() 570 transport.sendto(data, ()) 571 572 self.assertTrue(transport._fatal_error.called) 573 transport._fatal_error.assert_called_with( 574 err, 575 'Fatal write error on datagram transport') 576 transport._conn_lost = 1 577 578 transport._address = ('123',) 579 transport.sendto(data) 580 transport.sendto(data) 581 transport.sendto(data) 582 transport.sendto(data) 583 transport.sendto(data) 584 m_log.warning.assert_called_with('socket.sendto() raised exception.') 585 586 def test_sendto_error_received(self): 587 data = b'data' 588 589 self.sock.sendto.side_effect = ConnectionRefusedError 590 591 transport = self.datagram_transport() 592 transport._fatal_error = mock.Mock() 593 transport.sendto(data, ()) 594 595 self.assertEqual(transport._conn_lost, 0) 596 self.assertFalse(transport._fatal_error.called) 597 598 def test_sendto_error_received_connected(self): 599 data = b'data' 600 601 self.proactor.send.side_effect = ConnectionRefusedError 602 603 transport = self.datagram_transport(address=('0.0.0.0', 1)) 604 transport._fatal_error = mock.Mock() 605 transport.sendto(data) 606 607 self.assertFalse(transport._fatal_error.called) 608 self.assertTrue(self.protocol.error_received.called) 609 610 def test_sendto_str(self): 611 transport = self.datagram_transport() 612 self.assertRaises(TypeError, transport.sendto, 'str', ()) 613 614 def test_sendto_connected_addr(self): 615 transport = self.datagram_transport(address=('0.0.0.0', 1)) 616 self.assertRaises( 617 ValueError, transport.sendto, b'str', ('0.0.0.0', 2)) 618 619 def test_sendto_closing(self): 620 transport = self.datagram_transport(address=(1,)) 621 transport.close() 622 self.assertEqual(transport._conn_lost, 1) 623 transport.sendto(b'data', (1,)) 624 self.assertEqual(transport._conn_lost, 2) 625 626 def test__loop_writing_closing(self): 627 transport = self.datagram_transport() 628 transport._closing = True 629 transport._loop_writing() 630 self.assertIsNone(transport._write_fut) 631 test_utils.run_briefly(self.loop) 632 self.sock.close.assert_called_with() 633 self.protocol.connection_lost.assert_called_with(None) 634 635 def test__loop_writing_exception(self): 636 err = self.proactor.sendto.side_effect = RuntimeError() 637 638 transport = self.datagram_transport() 639 transport._fatal_error = mock.Mock() 640 transport._buffer.append((b'data', ())) 641 transport._loop_writing() 642 643 transport._fatal_error.assert_called_with( 644 err, 645 'Fatal write error on datagram transport') 646 647 def test__loop_writing_error_received(self): 648 self.proactor.sendto.side_effect = ConnectionRefusedError 649 650 transport = self.datagram_transport() 651 transport._fatal_error = mock.Mock() 652 transport._buffer.append((b'data', ())) 653 transport._loop_writing() 654 655 self.assertFalse(transport._fatal_error.called) 656 657 def test__loop_writing_error_received_connection(self): 658 self.proactor.send.side_effect = ConnectionRefusedError 659 660 transport = self.datagram_transport(address=('0.0.0.0', 1)) 661 transport._fatal_error = mock.Mock() 662 transport._buffer.append((b'data', ())) 663 transport._loop_writing() 664 665 self.assertFalse(transport._fatal_error.called) 666 self.assertTrue(self.protocol.error_received.called) 667 668 @mock.patch('asyncio.base_events.logger.error') 669 def test_fatal_error_connected(self, m_exc): 670 transport = self.datagram_transport(address=('0.0.0.0', 1)) 671 err = ConnectionRefusedError() 672 transport._fatal_error(err) 673 self.assertFalse(self.protocol.error_received.called) 674 m_exc.assert_not_called() 675 676 677class BaseProactorEventLoopTests(test_utils.TestCase): 678 679 def setUp(self): 680 super().setUp() 681 682 self.sock = test_utils.mock_nonblocking_socket() 683 self.proactor = mock.Mock() 684 685 self.ssock, self.csock = mock.Mock(), mock.Mock() 686 687 with mock.patch('asyncio.proactor_events.socket.socketpair', 688 return_value=(self.ssock, self.csock)): 689 with mock.patch('signal.set_wakeup_fd'): 690 self.loop = BaseProactorEventLoop(self.proactor) 691 self.set_event_loop(self.loop) 692 693 @mock.patch('asyncio.proactor_events.socket.socketpair') 694 def test_ctor(self, socketpair): 695 ssock, csock = socketpair.return_value = ( 696 mock.Mock(), mock.Mock()) 697 with mock.patch('signal.set_wakeup_fd'): 698 loop = BaseProactorEventLoop(self.proactor) 699 self.assertIs(loop._ssock, ssock) 700 self.assertIs(loop._csock, csock) 701 self.assertEqual(loop._internal_fds, 1) 702 loop.close() 703 704 def test_close_self_pipe(self): 705 self.loop._close_self_pipe() 706 self.assertEqual(self.loop._internal_fds, 0) 707 self.assertTrue(self.ssock.close.called) 708 self.assertTrue(self.csock.close.called) 709 self.assertIsNone(self.loop._ssock) 710 self.assertIsNone(self.loop._csock) 711 712 # Don't call close(): _close_self_pipe() cannot be called twice 713 self.loop._closed = True 714 715 def test_close(self): 716 self.loop._close_self_pipe = mock.Mock() 717 self.loop.close() 718 self.assertTrue(self.loop._close_self_pipe.called) 719 self.assertTrue(self.proactor.close.called) 720 self.assertIsNone(self.loop._proactor) 721 722 self.loop._close_self_pipe.reset_mock() 723 self.loop.close() 724 self.assertFalse(self.loop._close_self_pipe.called) 725 726 def test_make_socket_transport(self): 727 tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol()) 728 self.assertIsInstance(tr, _ProactorSocketTransport) 729 close_transport(tr) 730 731 def test_loop_self_reading(self): 732 self.loop._loop_self_reading() 733 self.proactor.recv.assert_called_with(self.ssock, 4096) 734 self.proactor.recv.return_value.add_done_callback.assert_called_with( 735 self.loop._loop_self_reading) 736 737 def test_loop_self_reading_fut(self): 738 fut = mock.Mock() 739 self.loop._loop_self_reading(fut) 740 self.assertTrue(fut.result.called) 741 self.proactor.recv.assert_called_with(self.ssock, 4096) 742 self.proactor.recv.return_value.add_done_callback.assert_called_with( 743 self.loop._loop_self_reading) 744 745 def test_loop_self_reading_exception(self): 746 self.loop.call_exception_handler = mock.Mock() 747 self.proactor.recv.side_effect = OSError() 748 self.loop._loop_self_reading() 749 self.assertTrue(self.loop.call_exception_handler.called) 750 751 def test_write_to_self(self): 752 self.loop._write_to_self() 753 self.csock.send.assert_called_with(b'\0') 754 755 def test_process_events(self): 756 self.loop._process_events([]) 757 758 @mock.patch('asyncio.base_events.logger') 759 def test_create_server(self, m_log): 760 pf = mock.Mock() 761 call_soon = self.loop.call_soon = mock.Mock() 762 763 self.loop._start_serving(pf, self.sock) 764 self.assertTrue(call_soon.called) 765 766 # callback 767 loop = call_soon.call_args[0][0] 768 loop() 769 self.proactor.accept.assert_called_with(self.sock) 770 771 # conn 772 fut = mock.Mock() 773 fut.result.return_value = (mock.Mock(), mock.Mock()) 774 775 make_tr = self.loop._make_socket_transport = mock.Mock() 776 loop(fut) 777 self.assertTrue(fut.result.called) 778 self.assertTrue(make_tr.called) 779 780 # exception 781 fut.result.side_effect = OSError() 782 loop(fut) 783 self.assertTrue(self.sock.close.called) 784 self.assertTrue(m_log.error.called) 785 786 def test_create_server_cancel(self): 787 pf = mock.Mock() 788 call_soon = self.loop.call_soon = mock.Mock() 789 790 self.loop._start_serving(pf, self.sock) 791 loop = call_soon.call_args[0][0] 792 793 # cancelled 794 fut = self.loop.create_future() 795 fut.cancel() 796 loop(fut) 797 self.assertTrue(self.sock.close.called) 798 799 def test_stop_serving(self): 800 sock1 = mock.Mock() 801 future1 = mock.Mock() 802 sock2 = mock.Mock() 803 future2 = mock.Mock() 804 self.loop._accept_futures = { 805 sock1.fileno(): future1, 806 sock2.fileno(): future2 807 } 808 809 self.loop._stop_serving(sock1) 810 self.assertTrue(sock1.close.called) 811 self.assertTrue(future1.cancel.called) 812 self.proactor._stop_serving.assert_called_with(sock1) 813 self.assertFalse(sock2.close.called) 814 self.assertFalse(future2.cancel.called) 815 816 def datagram_transport(self): 817 self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol) 818 return self.loop._make_datagram_transport(self.sock, self.protocol) 819 820 def test_make_datagram_transport(self): 821 tr = self.datagram_transport() 822 self.assertIsInstance(tr, _ProactorDatagramTransport) 823 close_transport(tr) 824 825 def test_datagram_loop_writing(self): 826 tr = self.datagram_transport() 827 tr._buffer.appendleft((b'data', ('127.0.0.1', 12068))) 828 tr._loop_writing() 829 self.loop._proactor.sendto.assert_called_with(self.sock, b'data', addr=('127.0.0.1', 12068)) 830 self.loop._proactor.sendto.return_value.add_done_callback.\ 831 assert_called_with(tr._loop_writing) 832 833 close_transport(tr) 834 835 def test_datagram_loop_reading(self): 836 tr = self.datagram_transport() 837 tr._loop_reading() 838 self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024) 839 self.assertFalse(self.protocol.datagram_received.called) 840 self.assertFalse(self.protocol.error_received.called) 841 close_transport(tr) 842 843 def test_datagram_loop_reading_data(self): 844 res = self.loop.create_future() 845 res.set_result((b'data', ('127.0.0.1', 12068))) 846 847 tr = self.datagram_transport() 848 tr._read_fut = res 849 tr._loop_reading(res) 850 self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024) 851 self.protocol.datagram_received.assert_called_with(b'data', ('127.0.0.1', 12068)) 852 close_transport(tr) 853 854 def test_datagram_loop_reading_no_data(self): 855 res = self.loop.create_future() 856 res.set_result((b'', ('127.0.0.1', 12068))) 857 858 tr = self.datagram_transport() 859 self.assertRaises(AssertionError, tr._loop_reading, res) 860 861 tr.close = mock.Mock() 862 tr._read_fut = res 863 tr._loop_reading(res) 864 self.assertTrue(self.loop._proactor.recvfrom.called) 865 self.assertFalse(self.protocol.error_received.called) 866 self.assertFalse(tr.close.called) 867 close_transport(tr) 868 869 def test_datagram_loop_reading_aborted(self): 870 err = self.loop._proactor.recvfrom.side_effect = ConnectionAbortedError() 871 872 tr = self.datagram_transport() 873 tr._fatal_error = mock.Mock() 874 tr._protocol.error_received = mock.Mock() 875 tr._loop_reading() 876 tr._protocol.error_received.assert_called_with(err) 877 close_transport(tr) 878 879 def test_datagram_loop_writing_aborted(self): 880 err = self.loop._proactor.sendto.side_effect = ConnectionAbortedError() 881 882 tr = self.datagram_transport() 883 tr._fatal_error = mock.Mock() 884 tr._protocol.error_received = mock.Mock() 885 tr._buffer.appendleft((b'Hello', ('127.0.0.1', 12068))) 886 tr._loop_writing() 887 tr._protocol.error_received.assert_called_with(err) 888 close_transport(tr) 889 890 891@unittest.skipIf(sys.platform != 'win32', 892 'Proactor is supported on Windows only') 893class ProactorEventLoopUnixSockSendfileTests(test_utils.TestCase): 894 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 895 896 class MyProto(asyncio.Protocol): 897 898 def __init__(self, loop): 899 self.started = False 900 self.closed = False 901 self.data = bytearray() 902 self.fut = loop.create_future() 903 self.transport = None 904 905 def connection_made(self, transport): 906 self.started = True 907 self.transport = transport 908 909 def data_received(self, data): 910 self.data.extend(data) 911 912 def connection_lost(self, exc): 913 self.closed = True 914 self.fut.set_result(None) 915 916 async def wait_closed(self): 917 await self.fut 918 919 @classmethod 920 def setUpClass(cls): 921 with open(support.TESTFN, 'wb') as fp: 922 fp.write(cls.DATA) 923 super().setUpClass() 924 925 @classmethod 926 def tearDownClass(cls): 927 support.unlink(support.TESTFN) 928 super().tearDownClass() 929 930 def setUp(self): 931 self.loop = asyncio.ProactorEventLoop() 932 self.set_event_loop(self.loop) 933 self.addCleanup(self.loop.close) 934 self.file = open(support.TESTFN, 'rb') 935 self.addCleanup(self.file.close) 936 super().setUp() 937 938 def make_socket(self, cleanup=True): 939 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 940 sock.setblocking(False) 941 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024) 942 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024) 943 if cleanup: 944 self.addCleanup(sock.close) 945 return sock 946 947 def run_loop(self, coro): 948 return self.loop.run_until_complete(coro) 949 950 def prepare(self): 951 sock = self.make_socket() 952 proto = self.MyProto(self.loop) 953 port = support.find_unused_port() 954 srv_sock = self.make_socket(cleanup=False) 955 srv_sock.bind(('127.0.0.1', port)) 956 server = self.run_loop(self.loop.create_server( 957 lambda: proto, sock=srv_sock)) 958 self.run_loop(self.loop.sock_connect(sock, srv_sock.getsockname())) 959 960 def cleanup(): 961 if proto.transport is not None: 962 # can be None if the task was cancelled before 963 # connection_made callback 964 proto.transport.close() 965 self.run_loop(proto.wait_closed()) 966 967 server.close() 968 self.run_loop(server.wait_closed()) 969 970 self.addCleanup(cleanup) 971 972 return sock, proto 973 974 def test_sock_sendfile_not_a_file(self): 975 sock, proto = self.prepare() 976 f = object() 977 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 978 "not a regular file"): 979 self.run_loop(self.loop._sock_sendfile_native(sock, f, 980 0, None)) 981 self.assertEqual(self.file.tell(), 0) 982 983 def test_sock_sendfile_iobuffer(self): 984 sock, proto = self.prepare() 985 f = io.BytesIO() 986 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 987 "not a regular file"): 988 self.run_loop(self.loop._sock_sendfile_native(sock, f, 989 0, None)) 990 self.assertEqual(self.file.tell(), 0) 991 992 def test_sock_sendfile_not_regular_file(self): 993 sock, proto = self.prepare() 994 f = mock.Mock() 995 f.fileno.return_value = -1 996 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 997 "not a regular file"): 998 self.run_loop(self.loop._sock_sendfile_native(sock, f, 999 0, None)) 1000 self.assertEqual(self.file.tell(), 0) 1001 1002 1003if __name__ == '__main__': 1004 unittest.main() 1005