1"""Tests for proactor_events.py""" 2 3import io 4import socket 5import unittest 6import sys 7from unittest import mock 8 9import asyncio 10from asyncio.proactor_events import BaseProactorEventLoop 11from asyncio.proactor_events import _ProactorSocketTransport 12from asyncio.proactor_events import _ProactorWritePipeTransport 13from asyncio.proactor_events import _ProactorDuplexPipeTransport 14from asyncio.proactor_events import _ProactorDatagramTransport 15from test.support import os_helper 16from test.support import socket_helper 17from test.test_asyncio import utils as test_utils 18 19 20def tearDownModule(): 21 asyncio.set_event_loop_policy(None) 22 23 24def close_transport(transport): 25 # Don't call transport.close() because the event loop and the IOCP proactor 26 # are mocked 27 if transport._sock is None: 28 return 29 transport._sock.close() 30 transport._sock = None 31 32 33class ProactorSocketTransportTests(test_utils.TestCase): 34 35 def setUp(self): 36 super().setUp() 37 self.loop = self.new_test_loop() 38 self.addCleanup(self.loop.close) 39 self.proactor = mock.Mock() 40 self.loop._proactor = self.proactor 41 self.protocol = test_utils.make_test_protocol(asyncio.Protocol) 42 self.sock = mock.Mock(socket.socket) 43 self.buffer_size = 65536 44 45 def socket_transport(self, waiter=None): 46 transport = _ProactorSocketTransport(self.loop, self.sock, 47 self.protocol, waiter=waiter) 48 self.addCleanup(close_transport, transport) 49 return transport 50 51 def test_ctor(self): 52 fut = self.loop.create_future() 53 tr = self.socket_transport(waiter=fut) 54 test_utils.run_briefly(self.loop) 55 self.assertIsNone(fut.result()) 56 self.protocol.connection_made(tr) 57 self.proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size)) 58 59 def test_loop_reading(self): 60 tr = self.socket_transport() 61 tr._loop_reading() 62 self.loop._proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size)) 63 self.assertFalse(self.protocol.data_received.called) 64 self.assertFalse(self.protocol.eof_received.called) 65 66 def test_loop_reading_data(self): 67 buf = b'data' 68 res = self.loop.create_future() 69 res.set_result(len(buf)) 70 71 tr = self.socket_transport() 72 tr._read_fut = res 73 tr._data[:len(buf)] = buf 74 tr._loop_reading(res) 75 called_buf = bytearray(self.buffer_size) 76 called_buf[:len(buf)] = buf 77 self.loop._proactor.recv_into.assert_called_with(self.sock, called_buf) 78 self.protocol.data_received.assert_called_with(bytearray(buf)) 79 80 def test_loop_reading_no_data(self): 81 res = self.loop.create_future() 82 res.set_result(0) 83 84 tr = self.socket_transport() 85 self.assertRaises(AssertionError, tr._loop_reading, res) 86 87 tr.close = mock.Mock() 88 tr._read_fut = res 89 tr._loop_reading(res) 90 self.assertFalse(self.loop._proactor.recv_into.called) 91 self.assertTrue(self.protocol.eof_received.called) 92 self.assertTrue(tr.close.called) 93 94 def test_loop_reading_aborted(self): 95 err = self.loop._proactor.recv_into.side_effect = ConnectionAbortedError() 96 97 tr = self.socket_transport() 98 tr._fatal_error = mock.Mock() 99 tr._loop_reading() 100 tr._fatal_error.assert_called_with( 101 err, 102 'Fatal read error on pipe transport') 103 104 def test_loop_reading_aborted_closing(self): 105 self.loop._proactor.recv_into.side_effect = ConnectionAbortedError() 106 107 tr = self.socket_transport() 108 tr._closing = True 109 tr._fatal_error = mock.Mock() 110 tr._loop_reading() 111 self.assertFalse(tr._fatal_error.called) 112 113 def test_loop_reading_aborted_is_fatal(self): 114 self.loop._proactor.recv_into.side_effect = ConnectionAbortedError() 115 tr = self.socket_transport() 116 tr._closing = False 117 tr._fatal_error = mock.Mock() 118 tr._loop_reading() 119 self.assertTrue(tr._fatal_error.called) 120 121 def test_loop_reading_conn_reset_lost(self): 122 err = self.loop._proactor.recv_into.side_effect = ConnectionResetError() 123 124 tr = self.socket_transport() 125 tr._closing = False 126 tr._fatal_error = mock.Mock() 127 tr._force_close = mock.Mock() 128 tr._loop_reading() 129 self.assertFalse(tr._fatal_error.called) 130 tr._force_close.assert_called_with(err) 131 132 def test_loop_reading_exception(self): 133 err = self.loop._proactor.recv_into.side_effect = (OSError()) 134 135 tr = self.socket_transport() 136 tr._fatal_error = mock.Mock() 137 tr._loop_reading() 138 tr._fatal_error.assert_called_with( 139 err, 140 'Fatal read error on pipe transport') 141 142 def test_write(self): 143 tr = self.socket_transport() 144 tr._loop_writing = mock.Mock() 145 tr.write(b'data') 146 self.assertEqual(tr._buffer, None) 147 tr._loop_writing.assert_called_with(data=b'data') 148 149 def test_write_no_data(self): 150 tr = self.socket_transport() 151 tr.write(b'') 152 self.assertFalse(tr._buffer) 153 154 def test_write_more(self): 155 tr = self.socket_transport() 156 tr._write_fut = mock.Mock() 157 tr._loop_writing = mock.Mock() 158 tr.write(b'data') 159 self.assertEqual(tr._buffer, b'data') 160 self.assertFalse(tr._loop_writing.called) 161 162 def test_loop_writing(self): 163 tr = self.socket_transport() 164 tr._buffer = bytearray(b'data') 165 tr._loop_writing() 166 self.loop._proactor.send.assert_called_with(self.sock, b'data') 167 self.loop._proactor.send.return_value.add_done_callback.\ 168 assert_called_with(tr._loop_writing) 169 170 @mock.patch('asyncio.proactor_events.logger') 171 def test_loop_writing_err(self, m_log): 172 err = self.loop._proactor.send.side_effect = OSError() 173 tr = self.socket_transport() 174 tr._fatal_error = mock.Mock() 175 tr._buffer = [b'da', b'ta'] 176 tr._loop_writing() 177 tr._fatal_error.assert_called_with( 178 err, 179 'Fatal write error on pipe transport') 180 tr._conn_lost = 1 181 182 tr.write(b'data') 183 tr.write(b'data') 184 tr.write(b'data') 185 tr.write(b'data') 186 tr.write(b'data') 187 self.assertEqual(tr._buffer, None) 188 m_log.warning.assert_called_with('socket.send() raised exception.') 189 190 def test_loop_writing_stop(self): 191 fut = self.loop.create_future() 192 fut.set_result(b'data') 193 194 tr = self.socket_transport() 195 tr._write_fut = fut 196 tr._loop_writing(fut) 197 self.assertIsNone(tr._write_fut) 198 199 def test_loop_writing_closing(self): 200 fut = self.loop.create_future() 201 fut.set_result(1) 202 203 tr = self.socket_transport() 204 tr._write_fut = fut 205 tr.close() 206 tr._loop_writing(fut) 207 self.assertIsNone(tr._write_fut) 208 test_utils.run_briefly(self.loop) 209 self.protocol.connection_lost.assert_called_with(None) 210 211 def test_abort(self): 212 tr = self.socket_transport() 213 tr._force_close = mock.Mock() 214 tr.abort() 215 tr._force_close.assert_called_with(None) 216 217 def test_close(self): 218 tr = self.socket_transport() 219 tr.close() 220 test_utils.run_briefly(self.loop) 221 self.protocol.connection_lost.assert_called_with(None) 222 self.assertTrue(tr.is_closing()) 223 self.assertEqual(tr._conn_lost, 1) 224 225 self.protocol.connection_lost.reset_mock() 226 tr.close() 227 test_utils.run_briefly(self.loop) 228 self.assertFalse(self.protocol.connection_lost.called) 229 230 def test_close_write_fut(self): 231 tr = self.socket_transport() 232 tr._write_fut = mock.Mock() 233 tr.close() 234 test_utils.run_briefly(self.loop) 235 self.assertFalse(self.protocol.connection_lost.called) 236 237 def test_close_buffer(self): 238 tr = self.socket_transport() 239 tr._buffer = [b'data'] 240 tr.close() 241 test_utils.run_briefly(self.loop) 242 self.assertFalse(self.protocol.connection_lost.called) 243 244 @mock.patch('asyncio.base_events.logger') 245 def test_fatal_error(self, m_logging): 246 tr = self.socket_transport() 247 tr._force_close = mock.Mock() 248 tr._fatal_error(None) 249 self.assertTrue(tr._force_close.called) 250 self.assertTrue(m_logging.error.called) 251 252 def test_force_close(self): 253 tr = self.socket_transport() 254 tr._buffer = [b'data'] 255 read_fut = tr._read_fut = mock.Mock() 256 write_fut = tr._write_fut = mock.Mock() 257 tr._force_close(None) 258 259 read_fut.cancel.assert_called_with() 260 write_fut.cancel.assert_called_with() 261 test_utils.run_briefly(self.loop) 262 self.protocol.connection_lost.assert_called_with(None) 263 self.assertEqual(None, tr._buffer) 264 self.assertEqual(tr._conn_lost, 1) 265 266 def test_loop_writing_force_close(self): 267 exc_handler = mock.Mock() 268 self.loop.set_exception_handler(exc_handler) 269 fut = self.loop.create_future() 270 fut.set_result(1) 271 self.proactor.send.return_value = fut 272 273 tr = self.socket_transport() 274 tr.write(b'data') 275 tr._force_close(None) 276 test_utils.run_briefly(self.loop) 277 exc_handler.assert_not_called() 278 279 def test_force_close_idempotent(self): 280 tr = self.socket_transport() 281 tr._closing = True 282 tr._force_close(None) 283 test_utils.run_briefly(self.loop) 284 self.assertFalse(self.protocol.connection_lost.called) 285 286 def test_fatal_error_2(self): 287 tr = self.socket_transport() 288 tr._buffer = [b'data'] 289 tr._force_close(None) 290 291 test_utils.run_briefly(self.loop) 292 self.protocol.connection_lost.assert_called_with(None) 293 self.assertEqual(None, tr._buffer) 294 295 def test_call_connection_lost(self): 296 tr = self.socket_transport() 297 tr._call_connection_lost(None) 298 self.assertTrue(self.protocol.connection_lost.called) 299 self.assertTrue(self.sock.close.called) 300 301 def test_write_eof(self): 302 tr = self.socket_transport() 303 self.assertTrue(tr.can_write_eof()) 304 tr.write_eof() 305 self.sock.shutdown.assert_called_with(socket.SHUT_WR) 306 tr.write_eof() 307 self.assertEqual(self.sock.shutdown.call_count, 1) 308 tr.close() 309 310 def test_write_eof_buffer(self): 311 tr = self.socket_transport() 312 f = self.loop.create_future() 313 tr._loop._proactor.send.return_value = f 314 tr.write(b'data') 315 tr.write_eof() 316 self.assertTrue(tr._eof_written) 317 self.assertFalse(self.sock.shutdown.called) 318 tr._loop._proactor.send.assert_called_with(self.sock, b'data') 319 f.set_result(4) 320 self.loop._run_once() 321 self.sock.shutdown.assert_called_with(socket.SHUT_WR) 322 tr.close() 323 324 def test_write_eof_write_pipe(self): 325 tr = _ProactorWritePipeTransport( 326 self.loop, self.sock, self.protocol) 327 self.assertTrue(tr.can_write_eof()) 328 tr.write_eof() 329 self.assertTrue(tr.is_closing()) 330 self.loop._run_once() 331 self.assertTrue(self.sock.close.called) 332 tr.close() 333 334 def test_write_eof_buffer_write_pipe(self): 335 tr = _ProactorWritePipeTransport(self.loop, self.sock, self.protocol) 336 f = self.loop.create_future() 337 tr._loop._proactor.send.return_value = f 338 tr.write(b'data') 339 tr.write_eof() 340 self.assertTrue(tr.is_closing()) 341 self.assertFalse(self.sock.shutdown.called) 342 tr._loop._proactor.send.assert_called_with(self.sock, b'data') 343 f.set_result(4) 344 self.loop._run_once() 345 self.loop._run_once() 346 self.assertTrue(self.sock.close.called) 347 tr.close() 348 349 def test_write_eof_duplex_pipe(self): 350 tr = _ProactorDuplexPipeTransport( 351 self.loop, self.sock, self.protocol) 352 self.assertFalse(tr.can_write_eof()) 353 with self.assertRaises(NotImplementedError): 354 tr.write_eof() 355 close_transport(tr) 356 357 def test_pause_resume_reading(self): 358 tr = self.socket_transport() 359 index = 0 360 msgs = [b'data1', b'data2', b'data3', b'data4', b'data5', b''] 361 reversed_msgs = list(reversed(msgs)) 362 363 def recv_into(sock, data): 364 f = self.loop.create_future() 365 msg = reversed_msgs.pop() 366 367 result = f.result 368 def monkey(): 369 data[:len(msg)] = msg 370 return result() 371 f.result = monkey 372 373 f.set_result(len(msg)) 374 return f 375 376 self.loop._proactor.recv_into.side_effect = recv_into 377 self.loop._run_once() 378 self.assertFalse(tr._paused) 379 self.assertTrue(tr.is_reading()) 380 381 for msg in msgs[:2]: 382 self.loop._run_once() 383 self.protocol.data_received.assert_called_with(bytearray(msg)) 384 385 tr.pause_reading() 386 tr.pause_reading() 387 self.assertTrue(tr._paused) 388 self.assertFalse(tr.is_reading()) 389 for i in range(10): 390 self.loop._run_once() 391 self.protocol.data_received.assert_called_with(bytearray(msgs[1])) 392 393 tr.resume_reading() 394 tr.resume_reading() 395 self.assertFalse(tr._paused) 396 self.assertTrue(tr.is_reading()) 397 398 for msg in msgs[2:4]: 399 self.loop._run_once() 400 self.protocol.data_received.assert_called_with(bytearray(msg)) 401 402 tr.pause_reading() 403 tr.resume_reading() 404 self.loop.call_exception_handler = mock.Mock() 405 self.loop._run_once() 406 self.loop.call_exception_handler.assert_not_called() 407 self.protocol.data_received.assert_called_with(bytearray(msgs[4])) 408 tr.close() 409 410 self.assertFalse(tr.is_reading()) 411 412 413 def pause_writing_transport(self, high): 414 tr = self.socket_transport() 415 tr.set_write_buffer_limits(high=high) 416 417 self.assertEqual(tr.get_write_buffer_size(), 0) 418 self.assertFalse(self.protocol.pause_writing.called) 419 self.assertFalse(self.protocol.resume_writing.called) 420 return tr 421 422 def test_pause_resume_writing(self): 423 tr = self.pause_writing_transport(high=4) 424 425 # write a large chunk, must pause writing 426 fut = self.loop.create_future() 427 self.loop._proactor.send.return_value = fut 428 tr.write(b'large data') 429 self.loop._run_once() 430 self.assertTrue(self.protocol.pause_writing.called) 431 432 # flush the buffer 433 fut.set_result(None) 434 self.loop._run_once() 435 self.assertEqual(tr.get_write_buffer_size(), 0) 436 self.assertTrue(self.protocol.resume_writing.called) 437 438 def test_pause_writing_2write(self): 439 tr = self.pause_writing_transport(high=4) 440 441 # first short write, the buffer is not full (3 <= 4) 442 fut1 = self.loop.create_future() 443 self.loop._proactor.send.return_value = fut1 444 tr.write(b'123') 445 self.loop._run_once() 446 self.assertEqual(tr.get_write_buffer_size(), 3) 447 self.assertFalse(self.protocol.pause_writing.called) 448 449 # fill the buffer, must pause writing (6 > 4) 450 tr.write(b'abc') 451 self.loop._run_once() 452 self.assertEqual(tr.get_write_buffer_size(), 6) 453 self.assertTrue(self.protocol.pause_writing.called) 454 455 def test_pause_writing_3write(self): 456 tr = self.pause_writing_transport(high=4) 457 458 # first short write, the buffer is not full (1 <= 4) 459 fut = self.loop.create_future() 460 self.loop._proactor.send.return_value = fut 461 tr.write(b'1') 462 self.loop._run_once() 463 self.assertEqual(tr.get_write_buffer_size(), 1) 464 self.assertFalse(self.protocol.pause_writing.called) 465 466 # second short write, the buffer is not full (3 <= 4) 467 tr.write(b'23') 468 self.loop._run_once() 469 self.assertEqual(tr.get_write_buffer_size(), 3) 470 self.assertFalse(self.protocol.pause_writing.called) 471 472 # fill the buffer, must pause writing (6 > 4) 473 tr.write(b'abc') 474 self.loop._run_once() 475 self.assertEqual(tr.get_write_buffer_size(), 6) 476 self.assertTrue(self.protocol.pause_writing.called) 477 478 def test_dont_pause_writing(self): 479 tr = self.pause_writing_transport(high=4) 480 481 # write a large chunk which completes immediately, 482 # it should not pause writing 483 fut = self.loop.create_future() 484 fut.set_result(None) 485 self.loop._proactor.send.return_value = fut 486 tr.write(b'very large data') 487 self.loop._run_once() 488 self.assertEqual(tr.get_write_buffer_size(), 0) 489 self.assertFalse(self.protocol.pause_writing.called) 490 491 492class ProactorDatagramTransportTests(test_utils.TestCase): 493 494 def setUp(self): 495 super().setUp() 496 self.loop = self.new_test_loop() 497 self.proactor = mock.Mock() 498 self.loop._proactor = self.proactor 499 self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol) 500 self.sock = mock.Mock(spec_set=socket.socket) 501 self.sock.fileno.return_value = 7 502 503 def datagram_transport(self, address=None): 504 self.sock.getpeername.side_effect = None if address else OSError 505 transport = _ProactorDatagramTransport(self.loop, self.sock, 506 self.protocol, 507 address=address) 508 self.addCleanup(close_transport, transport) 509 return transport 510 511 def test_sendto(self): 512 data = b'data' 513 transport = self.datagram_transport() 514 transport.sendto(data, ('0.0.0.0', 1234)) 515 self.assertTrue(self.proactor.sendto.called) 516 self.proactor.sendto.assert_called_with( 517 self.sock, data, addr=('0.0.0.0', 1234)) 518 519 def test_sendto_bytearray(self): 520 data = bytearray(b'data') 521 transport = self.datagram_transport() 522 transport.sendto(data, ('0.0.0.0', 1234)) 523 self.assertTrue(self.proactor.sendto.called) 524 self.proactor.sendto.assert_called_with( 525 self.sock, b'data', addr=('0.0.0.0', 1234)) 526 527 def test_sendto_memoryview(self): 528 data = memoryview(b'data') 529 transport = self.datagram_transport() 530 transport.sendto(data, ('0.0.0.0', 1234)) 531 self.assertTrue(self.proactor.sendto.called) 532 self.proactor.sendto.assert_called_with( 533 self.sock, b'data', addr=('0.0.0.0', 1234)) 534 535 def test_sendto_no_data(self): 536 transport = self.datagram_transport() 537 transport._buffer.append((b'data', ('0.0.0.0', 12345))) 538 transport.sendto(b'', ()) 539 self.assertFalse(self.sock.sendto.called) 540 self.assertEqual( 541 [(b'data', ('0.0.0.0', 12345))], list(transport._buffer)) 542 543 def test_sendto_buffer(self): 544 transport = self.datagram_transport() 545 transport._buffer.append((b'data1', ('0.0.0.0', 12345))) 546 transport._write_fut = object() 547 transport.sendto(b'data2', ('0.0.0.0', 12345)) 548 self.assertFalse(self.proactor.sendto.called) 549 self.assertEqual( 550 [(b'data1', ('0.0.0.0', 12345)), 551 (b'data2', ('0.0.0.0', 12345))], 552 list(transport._buffer)) 553 554 def test_sendto_buffer_bytearray(self): 555 data2 = bytearray(b'data2') 556 transport = self.datagram_transport() 557 transport._buffer.append((b'data1', ('0.0.0.0', 12345))) 558 transport._write_fut = object() 559 transport.sendto(data2, ('0.0.0.0', 12345)) 560 self.assertFalse(self.proactor.sendto.called) 561 self.assertEqual( 562 [(b'data1', ('0.0.0.0', 12345)), 563 (b'data2', ('0.0.0.0', 12345))], 564 list(transport._buffer)) 565 self.assertIsInstance(transport._buffer[1][0], bytes) 566 567 def test_sendto_buffer_memoryview(self): 568 data2 = memoryview(b'data2') 569 transport = self.datagram_transport() 570 transport._buffer.append((b'data1', ('0.0.0.0', 12345))) 571 transport._write_fut = object() 572 transport.sendto(data2, ('0.0.0.0', 12345)) 573 self.assertFalse(self.proactor.sendto.called) 574 self.assertEqual( 575 [(b'data1', ('0.0.0.0', 12345)), 576 (b'data2', ('0.0.0.0', 12345))], 577 list(transport._buffer)) 578 self.assertIsInstance(transport._buffer[1][0], bytes) 579 580 @mock.patch('asyncio.proactor_events.logger') 581 def test_sendto_exception(self, m_log): 582 data = b'data' 583 err = self.proactor.sendto.side_effect = RuntimeError() 584 585 transport = self.datagram_transport() 586 transport._fatal_error = mock.Mock() 587 transport.sendto(data, ()) 588 589 self.assertTrue(transport._fatal_error.called) 590 transport._fatal_error.assert_called_with( 591 err, 592 'Fatal write error on datagram transport') 593 transport._conn_lost = 1 594 595 transport._address = ('123',) 596 transport.sendto(data) 597 transport.sendto(data) 598 transport.sendto(data) 599 transport.sendto(data) 600 transport.sendto(data) 601 m_log.warning.assert_called_with('socket.sendto() raised exception.') 602 603 def test_sendto_error_received(self): 604 data = b'data' 605 606 self.sock.sendto.side_effect = ConnectionRefusedError 607 608 transport = self.datagram_transport() 609 transport._fatal_error = mock.Mock() 610 transport.sendto(data, ()) 611 612 self.assertEqual(transport._conn_lost, 0) 613 self.assertFalse(transport._fatal_error.called) 614 615 def test_sendto_error_received_connected(self): 616 data = b'data' 617 618 self.proactor.send.side_effect = ConnectionRefusedError 619 620 transport = self.datagram_transport(address=('0.0.0.0', 1)) 621 transport._fatal_error = mock.Mock() 622 transport.sendto(data) 623 624 self.assertFalse(transport._fatal_error.called) 625 self.assertTrue(self.protocol.error_received.called) 626 627 def test_sendto_str(self): 628 transport = self.datagram_transport() 629 self.assertRaises(TypeError, transport.sendto, 'str', ()) 630 631 def test_sendto_connected_addr(self): 632 transport = self.datagram_transport(address=('0.0.0.0', 1)) 633 self.assertRaises( 634 ValueError, transport.sendto, b'str', ('0.0.0.0', 2)) 635 636 def test_sendto_closing(self): 637 transport = self.datagram_transport(address=(1,)) 638 transport.close() 639 self.assertEqual(transport._conn_lost, 1) 640 transport.sendto(b'data', (1,)) 641 self.assertEqual(transport._conn_lost, 2) 642 643 def test__loop_writing_closing(self): 644 transport = self.datagram_transport() 645 transport._closing = True 646 transport._loop_writing() 647 self.assertIsNone(transport._write_fut) 648 test_utils.run_briefly(self.loop) 649 self.sock.close.assert_called_with() 650 self.protocol.connection_lost.assert_called_with(None) 651 652 def test__loop_writing_exception(self): 653 err = self.proactor.sendto.side_effect = RuntimeError() 654 655 transport = self.datagram_transport() 656 transport._fatal_error = mock.Mock() 657 transport._buffer.append((b'data', ())) 658 transport._loop_writing() 659 660 transport._fatal_error.assert_called_with( 661 err, 662 'Fatal write error on datagram transport') 663 664 def test__loop_writing_error_received(self): 665 self.proactor.sendto.side_effect = ConnectionRefusedError 666 667 transport = self.datagram_transport() 668 transport._fatal_error = mock.Mock() 669 transport._buffer.append((b'data', ())) 670 transport._loop_writing() 671 672 self.assertFalse(transport._fatal_error.called) 673 674 def test__loop_writing_error_received_connection(self): 675 self.proactor.send.side_effect = ConnectionRefusedError 676 677 transport = self.datagram_transport(address=('0.0.0.0', 1)) 678 transport._fatal_error = mock.Mock() 679 transport._buffer.append((b'data', ())) 680 transport._loop_writing() 681 682 self.assertFalse(transport._fatal_error.called) 683 self.assertTrue(self.protocol.error_received.called) 684 685 @mock.patch('asyncio.base_events.logger.error') 686 def test_fatal_error_connected(self, m_exc): 687 transport = self.datagram_transport(address=('0.0.0.0', 1)) 688 err = ConnectionRefusedError() 689 transport._fatal_error(err) 690 self.assertFalse(self.protocol.error_received.called) 691 m_exc.assert_not_called() 692 693 694class BaseProactorEventLoopTests(test_utils.TestCase): 695 696 def setUp(self): 697 super().setUp() 698 699 self.sock = test_utils.mock_nonblocking_socket() 700 self.proactor = mock.Mock() 701 702 self.ssock, self.csock = mock.Mock(), mock.Mock() 703 704 with mock.patch('asyncio.proactor_events.socket.socketpair', 705 return_value=(self.ssock, self.csock)): 706 with mock.patch('signal.set_wakeup_fd'): 707 self.loop = BaseProactorEventLoop(self.proactor) 708 self.set_event_loop(self.loop) 709 710 @mock.patch('asyncio.proactor_events.socket.socketpair') 711 def test_ctor(self, socketpair): 712 ssock, csock = socketpair.return_value = ( 713 mock.Mock(), mock.Mock()) 714 with mock.patch('signal.set_wakeup_fd'): 715 loop = BaseProactorEventLoop(self.proactor) 716 self.assertIs(loop._ssock, ssock) 717 self.assertIs(loop._csock, csock) 718 self.assertEqual(loop._internal_fds, 1) 719 loop.close() 720 721 def test_close_self_pipe(self): 722 self.loop._close_self_pipe() 723 self.assertEqual(self.loop._internal_fds, 0) 724 self.assertTrue(self.ssock.close.called) 725 self.assertTrue(self.csock.close.called) 726 self.assertIsNone(self.loop._ssock) 727 self.assertIsNone(self.loop._csock) 728 729 # Don't call close(): _close_self_pipe() cannot be called twice 730 self.loop._closed = True 731 732 def test_close(self): 733 self.loop._close_self_pipe = mock.Mock() 734 self.loop.close() 735 self.assertTrue(self.loop._close_self_pipe.called) 736 self.assertTrue(self.proactor.close.called) 737 self.assertIsNone(self.loop._proactor) 738 739 self.loop._close_self_pipe.reset_mock() 740 self.loop.close() 741 self.assertFalse(self.loop._close_self_pipe.called) 742 743 def test_make_socket_transport(self): 744 tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol()) 745 self.assertIsInstance(tr, _ProactorSocketTransport) 746 close_transport(tr) 747 748 def test_loop_self_reading(self): 749 self.loop._loop_self_reading() 750 self.proactor.recv.assert_called_with(self.ssock, 4096) 751 self.proactor.recv.return_value.add_done_callback.assert_called_with( 752 self.loop._loop_self_reading) 753 754 def test_loop_self_reading_fut(self): 755 fut = mock.Mock() 756 self.loop._self_reading_future = fut 757 self.loop._loop_self_reading(fut) 758 self.assertTrue(fut.result.called) 759 self.proactor.recv.assert_called_with(self.ssock, 4096) 760 self.proactor.recv.return_value.add_done_callback.assert_called_with( 761 self.loop._loop_self_reading) 762 763 def test_loop_self_reading_exception(self): 764 self.loop.call_exception_handler = mock.Mock() 765 self.proactor.recv.side_effect = OSError() 766 self.loop._loop_self_reading() 767 self.assertTrue(self.loop.call_exception_handler.called) 768 769 def test_write_to_self(self): 770 self.loop._write_to_self() 771 self.csock.send.assert_called_with(b'\0') 772 773 def test_process_events(self): 774 self.loop._process_events([]) 775 776 @mock.patch('asyncio.base_events.logger') 777 def test_create_server(self, m_log): 778 pf = mock.Mock() 779 call_soon = self.loop.call_soon = mock.Mock() 780 781 self.loop._start_serving(pf, self.sock) 782 self.assertTrue(call_soon.called) 783 784 # callback 785 loop = call_soon.call_args[0][0] 786 loop() 787 self.proactor.accept.assert_called_with(self.sock) 788 789 # conn 790 fut = mock.Mock() 791 fut.result.return_value = (mock.Mock(), mock.Mock()) 792 793 make_tr = self.loop._make_socket_transport = mock.Mock() 794 loop(fut) 795 self.assertTrue(fut.result.called) 796 self.assertTrue(make_tr.called) 797 798 # exception 799 fut.result.side_effect = OSError() 800 loop(fut) 801 self.assertTrue(self.sock.close.called) 802 self.assertTrue(m_log.error.called) 803 804 def test_create_server_cancel(self): 805 pf = mock.Mock() 806 call_soon = self.loop.call_soon = mock.Mock() 807 808 self.loop._start_serving(pf, self.sock) 809 loop = call_soon.call_args[0][0] 810 811 # cancelled 812 fut = self.loop.create_future() 813 fut.cancel() 814 loop(fut) 815 self.assertTrue(self.sock.close.called) 816 817 def test_stop_serving(self): 818 sock1 = mock.Mock() 819 future1 = mock.Mock() 820 sock2 = mock.Mock() 821 future2 = mock.Mock() 822 self.loop._accept_futures = { 823 sock1.fileno(): future1, 824 sock2.fileno(): future2 825 } 826 827 self.loop._stop_serving(sock1) 828 self.assertTrue(sock1.close.called) 829 self.assertTrue(future1.cancel.called) 830 self.proactor._stop_serving.assert_called_with(sock1) 831 self.assertFalse(sock2.close.called) 832 self.assertFalse(future2.cancel.called) 833 834 def datagram_transport(self): 835 self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol) 836 return self.loop._make_datagram_transport(self.sock, self.protocol) 837 838 def test_make_datagram_transport(self): 839 tr = self.datagram_transport() 840 self.assertIsInstance(tr, _ProactorDatagramTransport) 841 close_transport(tr) 842 843 def test_datagram_loop_writing(self): 844 tr = self.datagram_transport() 845 tr._buffer.appendleft((b'data', ('127.0.0.1', 12068))) 846 tr._loop_writing() 847 self.loop._proactor.sendto.assert_called_with(self.sock, b'data', addr=('127.0.0.1', 12068)) 848 self.loop._proactor.sendto.return_value.add_done_callback.\ 849 assert_called_with(tr._loop_writing) 850 851 close_transport(tr) 852 853 def test_datagram_loop_reading(self): 854 tr = self.datagram_transport() 855 tr._loop_reading() 856 self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024) 857 self.assertFalse(self.protocol.datagram_received.called) 858 self.assertFalse(self.protocol.error_received.called) 859 close_transport(tr) 860 861 def test_datagram_loop_reading_data(self): 862 res = self.loop.create_future() 863 res.set_result((b'data', ('127.0.0.1', 12068))) 864 865 tr = self.datagram_transport() 866 tr._read_fut = res 867 tr._loop_reading(res) 868 self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024) 869 self.protocol.datagram_received.assert_called_with(b'data', ('127.0.0.1', 12068)) 870 close_transport(tr) 871 872 def test_datagram_loop_reading_no_data(self): 873 res = self.loop.create_future() 874 res.set_result((b'', ('127.0.0.1', 12068))) 875 876 tr = self.datagram_transport() 877 self.assertRaises(AssertionError, tr._loop_reading, res) 878 879 tr.close = mock.Mock() 880 tr._read_fut = res 881 tr._loop_reading(res) 882 self.assertTrue(self.loop._proactor.recvfrom.called) 883 self.assertFalse(self.protocol.error_received.called) 884 self.assertFalse(tr.close.called) 885 close_transport(tr) 886 887 def test_datagram_loop_reading_aborted(self): 888 err = self.loop._proactor.recvfrom.side_effect = ConnectionAbortedError() 889 890 tr = self.datagram_transport() 891 tr._fatal_error = mock.Mock() 892 tr._protocol.error_received = mock.Mock() 893 tr._loop_reading() 894 tr._protocol.error_received.assert_called_with(err) 895 close_transport(tr) 896 897 def test_datagram_loop_writing_aborted(self): 898 err = self.loop._proactor.sendto.side_effect = ConnectionAbortedError() 899 900 tr = self.datagram_transport() 901 tr._fatal_error = mock.Mock() 902 tr._protocol.error_received = mock.Mock() 903 tr._buffer.appendleft((b'Hello', ('127.0.0.1', 12068))) 904 tr._loop_writing() 905 tr._protocol.error_received.assert_called_with(err) 906 close_transport(tr) 907 908 909@unittest.skipIf(sys.platform != 'win32', 910 'Proactor is supported on Windows only') 911class ProactorEventLoopUnixSockSendfileTests(test_utils.TestCase): 912 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 913 914 class MyProto(asyncio.Protocol): 915 916 def __init__(self, loop): 917 self.started = False 918 self.closed = False 919 self.data = bytearray() 920 self.fut = loop.create_future() 921 self.transport = None 922 923 def connection_made(self, transport): 924 self.started = True 925 self.transport = transport 926 927 def data_received(self, data): 928 self.data.extend(data) 929 930 def connection_lost(self, exc): 931 self.closed = True 932 self.fut.set_result(None) 933 934 async def wait_closed(self): 935 await self.fut 936 937 @classmethod 938 def setUpClass(cls): 939 with open(os_helper.TESTFN, 'wb') as fp: 940 fp.write(cls.DATA) 941 super().setUpClass() 942 943 @classmethod 944 def tearDownClass(cls): 945 os_helper.unlink(os_helper.TESTFN) 946 super().tearDownClass() 947 948 def setUp(self): 949 self.loop = asyncio.ProactorEventLoop() 950 self.set_event_loop(self.loop) 951 self.addCleanup(self.loop.close) 952 self.file = open(os_helper.TESTFN, 'rb') 953 self.addCleanup(self.file.close) 954 super().setUp() 955 956 def make_socket(self, cleanup=True): 957 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 958 sock.setblocking(False) 959 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024) 960 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024) 961 if cleanup: 962 self.addCleanup(sock.close) 963 return sock 964 965 def run_loop(self, coro): 966 return self.loop.run_until_complete(coro) 967 968 def prepare(self): 969 sock = self.make_socket() 970 proto = self.MyProto(self.loop) 971 port = socket_helper.find_unused_port() 972 srv_sock = self.make_socket(cleanup=False) 973 srv_sock.bind(('127.0.0.1', port)) 974 server = self.run_loop(self.loop.create_server( 975 lambda: proto, sock=srv_sock)) 976 self.run_loop(self.loop.sock_connect(sock, srv_sock.getsockname())) 977 978 def cleanup(): 979 if proto.transport is not None: 980 # can be None if the task was cancelled before 981 # connection_made callback 982 proto.transport.close() 983 self.run_loop(proto.wait_closed()) 984 985 server.close() 986 self.run_loop(server.wait_closed()) 987 988 self.addCleanup(cleanup) 989 990 return sock, proto 991 992 def test_sock_sendfile_not_a_file(self): 993 sock, proto = self.prepare() 994 f = object() 995 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 996 "not a regular file"): 997 self.run_loop(self.loop._sock_sendfile_native(sock, f, 998 0, None)) 999 self.assertEqual(self.file.tell(), 0) 1000 1001 def test_sock_sendfile_iobuffer(self): 1002 sock, proto = self.prepare() 1003 f = io.BytesIO() 1004 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 1005 "not a regular file"): 1006 self.run_loop(self.loop._sock_sendfile_native(sock, f, 1007 0, None)) 1008 self.assertEqual(self.file.tell(), 0) 1009 1010 def test_sock_sendfile_not_regular_file(self): 1011 sock, proto = self.prepare() 1012 f = mock.Mock() 1013 f.fileno.return_value = -1 1014 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 1015 "not a regular file"): 1016 self.run_loop(self.loop._sock_sendfile_native(sock, f, 1017 0, None)) 1018 self.assertEqual(self.file.tell(), 0) 1019 1020 1021if __name__ == '__main__': 1022 unittest.main() 1023