1"""Tests for proactor_events.py""" 2 3import io 4import socket 5import unittest 6import sys 7from unittest import mock 8 9import asyncio 10from asyncio.proactor_events import BaseProactorEventLoop 11from asyncio.proactor_events import _ProactorSocketTransport 12from asyncio.proactor_events import _ProactorWritePipeTransport 13from asyncio.proactor_events import _ProactorDuplexPipeTransport 14from asyncio.proactor_events import _ProactorDatagramTransport 15from test.support import os_helper 16from test.support import socket_helper 17from test.test_asyncio import utils as test_utils 18 19 20def tearDownModule(): 21 asyncio.set_event_loop_policy(None) 22 23 24def close_transport(transport): 25 # Don't call transport.close() because the event loop and the IOCP proactor 26 # are mocked 27 if transport._sock is None: 28 return 29 transport._sock.close() 30 transport._sock = None 31 32 33class ProactorSocketTransportTests(test_utils.TestCase): 34 35 def setUp(self): 36 super().setUp() 37 self.loop = self.new_test_loop() 38 self.addCleanup(self.loop.close) 39 self.proactor = mock.Mock() 40 self.loop._proactor = self.proactor 41 self.protocol = test_utils.make_test_protocol(asyncio.Protocol) 42 self.sock = mock.Mock(socket.socket) 43 self.buffer_size = 65536 44 45 def socket_transport(self, waiter=None): 46 transport = _ProactorSocketTransport(self.loop, self.sock, 47 self.protocol, waiter=waiter) 48 self.addCleanup(close_transport, transport) 49 return transport 50 51 def test_ctor(self): 52 fut = self.loop.create_future() 53 tr = self.socket_transport(waiter=fut) 54 test_utils.run_briefly(self.loop) 55 self.assertIsNone(fut.result()) 56 self.protocol.connection_made(tr) 57 self.proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size)) 58 59 def test_loop_reading(self): 60 tr = self.socket_transport() 61 tr._loop_reading() 62 self.loop._proactor.recv_into.assert_called_with(self.sock, bytearray(self.buffer_size)) 63 self.assertFalse(self.protocol.data_received.called) 64 self.assertFalse(self.protocol.eof_received.called) 65 66 def test_loop_reading_data(self): 67 buf = b'data' 68 res = self.loop.create_future() 69 res.set_result(len(buf)) 70 71 tr = self.socket_transport() 72 tr._read_fut = res 73 tr._data[:len(buf)] = buf 74 tr._loop_reading(res) 75 called_buf = bytearray(self.buffer_size) 76 called_buf[:len(buf)] = buf 77 self.loop._proactor.recv_into.assert_called_with(self.sock, called_buf) 78 self.protocol.data_received.assert_called_with(bytearray(buf)) 79 80 def test_loop_reading_no_data(self): 81 res = self.loop.create_future() 82 res.set_result(0) 83 84 tr = self.socket_transport() 85 self.assertRaises(AssertionError, tr._loop_reading, res) 86 87 tr.close = mock.Mock() 88 tr._read_fut = res 89 tr._loop_reading(res) 90 self.assertFalse(self.loop._proactor.recv_into.called) 91 self.assertTrue(self.protocol.eof_received.called) 92 self.assertTrue(tr.close.called) 93 94 def test_loop_reading_aborted(self): 95 err = self.loop._proactor.recv_into.side_effect = ConnectionAbortedError() 96 97 tr = self.socket_transport() 98 tr._fatal_error = mock.Mock() 99 tr._loop_reading() 100 tr._fatal_error.assert_called_with( 101 err, 102 'Fatal read error on pipe transport') 103 104 def test_loop_reading_aborted_closing(self): 105 self.loop._proactor.recv_into.side_effect = ConnectionAbortedError() 106 107 tr = self.socket_transport() 108 tr._closing = True 109 tr._fatal_error = mock.Mock() 110 tr._loop_reading() 111 self.assertFalse(tr._fatal_error.called) 112 113 def test_loop_reading_aborted_is_fatal(self): 114 self.loop._proactor.recv_into.side_effect = ConnectionAbortedError() 115 tr = self.socket_transport() 116 tr._closing = False 117 tr._fatal_error = mock.Mock() 118 tr._loop_reading() 119 self.assertTrue(tr._fatal_error.called) 120 121 def test_loop_reading_conn_reset_lost(self): 122 err = self.loop._proactor.recv_into.side_effect = ConnectionResetError() 123 124 tr = self.socket_transport() 125 tr._closing = False 126 tr._fatal_error = mock.Mock() 127 tr._force_close = mock.Mock() 128 tr._loop_reading() 129 self.assertFalse(tr._fatal_error.called) 130 tr._force_close.assert_called_with(err) 131 132 def test_loop_reading_exception(self): 133 err = self.loop._proactor.recv_into.side_effect = (OSError()) 134 135 tr = self.socket_transport() 136 tr._fatal_error = mock.Mock() 137 tr._loop_reading() 138 tr._fatal_error.assert_called_with( 139 err, 140 'Fatal read error on pipe transport') 141 142 def test_write(self): 143 tr = self.socket_transport() 144 tr._loop_writing = mock.Mock() 145 tr.write(b'data') 146 self.assertEqual(tr._buffer, None) 147 tr._loop_writing.assert_called_with(data=b'data') 148 149 def test_write_no_data(self): 150 tr = self.socket_transport() 151 tr.write(b'') 152 self.assertFalse(tr._buffer) 153 154 def test_write_more(self): 155 tr = self.socket_transport() 156 tr._write_fut = mock.Mock() 157 tr._loop_writing = mock.Mock() 158 tr.write(b'data') 159 self.assertEqual(tr._buffer, b'data') 160 self.assertFalse(tr._loop_writing.called) 161 162 def test_loop_writing(self): 163 tr = self.socket_transport() 164 tr._buffer = bytearray(b'data') 165 tr._loop_writing() 166 self.loop._proactor.send.assert_called_with(self.sock, b'data') 167 self.loop._proactor.send.return_value.add_done_callback.\ 168 assert_called_with(tr._loop_writing) 169 170 @mock.patch('asyncio.proactor_events.logger') 171 def test_loop_writing_err(self, m_log): 172 err = self.loop._proactor.send.side_effect = OSError() 173 tr = self.socket_transport() 174 tr._fatal_error = mock.Mock() 175 tr._buffer = [b'da', b'ta'] 176 tr._loop_writing() 177 tr._fatal_error.assert_called_with( 178 err, 179 'Fatal write error on pipe transport') 180 tr._conn_lost = 1 181 182 tr.write(b'data') 183 tr.write(b'data') 184 tr.write(b'data') 185 tr.write(b'data') 186 tr.write(b'data') 187 self.assertEqual(tr._buffer, None) 188 m_log.warning.assert_called_with('socket.send() raised exception.') 189 190 def test_loop_writing_stop(self): 191 fut = self.loop.create_future() 192 fut.set_result(b'data') 193 194 tr = self.socket_transport() 195 tr._write_fut = fut 196 tr._loop_writing(fut) 197 self.assertIsNone(tr._write_fut) 198 199 def test_loop_writing_closing(self): 200 fut = self.loop.create_future() 201 fut.set_result(1) 202 203 tr = self.socket_transport() 204 tr._write_fut = fut 205 tr.close() 206 tr._loop_writing(fut) 207 self.assertIsNone(tr._write_fut) 208 test_utils.run_briefly(self.loop) 209 self.protocol.connection_lost.assert_called_with(None) 210 211 def test_abort(self): 212 tr = self.socket_transport() 213 tr._force_close = mock.Mock() 214 tr.abort() 215 tr._force_close.assert_called_with(None) 216 217 def test_close(self): 218 tr = self.socket_transport() 219 tr.close() 220 test_utils.run_briefly(self.loop) 221 self.protocol.connection_lost.assert_called_with(None) 222 self.assertTrue(tr.is_closing()) 223 self.assertEqual(tr._conn_lost, 1) 224 225 self.protocol.connection_lost.reset_mock() 226 tr.close() 227 test_utils.run_briefly(self.loop) 228 self.assertFalse(self.protocol.connection_lost.called) 229 230 def test_close_write_fut(self): 231 tr = self.socket_transport() 232 tr._write_fut = mock.Mock() 233 tr.close() 234 test_utils.run_briefly(self.loop) 235 self.assertFalse(self.protocol.connection_lost.called) 236 237 def test_close_buffer(self): 238 tr = self.socket_transport() 239 tr._buffer = [b'data'] 240 tr.close() 241 test_utils.run_briefly(self.loop) 242 self.assertFalse(self.protocol.connection_lost.called) 243 244 def test_close_invalid_sockobj(self): 245 tr = self.socket_transport() 246 self.sock.fileno.return_value = -1 247 tr.close() 248 test_utils.run_briefly(self.loop) 249 self.protocol.connection_lost.assert_called_with(None) 250 self.assertFalse(self.sock.shutdown.called) 251 252 @mock.patch('asyncio.base_events.logger') 253 def test_fatal_error(self, m_logging): 254 tr = self.socket_transport() 255 tr._force_close = mock.Mock() 256 tr._fatal_error(None) 257 self.assertTrue(tr._force_close.called) 258 self.assertTrue(m_logging.error.called) 259 260 def test_force_close(self): 261 tr = self.socket_transport() 262 tr._buffer = [b'data'] 263 read_fut = tr._read_fut = mock.Mock() 264 write_fut = tr._write_fut = mock.Mock() 265 tr._force_close(None) 266 267 read_fut.cancel.assert_called_with() 268 write_fut.cancel.assert_called_with() 269 test_utils.run_briefly(self.loop) 270 self.protocol.connection_lost.assert_called_with(None) 271 self.assertEqual(None, tr._buffer) 272 self.assertEqual(tr._conn_lost, 1) 273 274 def test_loop_writing_force_close(self): 275 exc_handler = mock.Mock() 276 self.loop.set_exception_handler(exc_handler) 277 fut = self.loop.create_future() 278 fut.set_result(1) 279 self.proactor.send.return_value = fut 280 281 tr = self.socket_transport() 282 tr.write(b'data') 283 tr._force_close(None) 284 test_utils.run_briefly(self.loop) 285 exc_handler.assert_not_called() 286 287 def test_force_close_idempotent(self): 288 tr = self.socket_transport() 289 tr._closing = True 290 tr._force_close(None) 291 test_utils.run_briefly(self.loop) 292 self.assertFalse(self.protocol.connection_lost.called) 293 294 def test_fatal_error_2(self): 295 tr = self.socket_transport() 296 tr._buffer = [b'data'] 297 tr._force_close(None) 298 299 test_utils.run_briefly(self.loop) 300 self.protocol.connection_lost.assert_called_with(None) 301 self.assertEqual(None, tr._buffer) 302 303 def test_call_connection_lost(self): 304 tr = self.socket_transport() 305 tr._call_connection_lost(None) 306 self.assertTrue(self.protocol.connection_lost.called) 307 self.assertTrue(self.sock.close.called) 308 309 def test_write_eof(self): 310 tr = self.socket_transport() 311 self.assertTrue(tr.can_write_eof()) 312 tr.write_eof() 313 self.sock.shutdown.assert_called_with(socket.SHUT_WR) 314 tr.write_eof() 315 self.assertEqual(self.sock.shutdown.call_count, 1) 316 tr.close() 317 318 def test_write_eof_buffer(self): 319 tr = self.socket_transport() 320 f = self.loop.create_future() 321 tr._loop._proactor.send.return_value = f 322 tr.write(b'data') 323 tr.write_eof() 324 self.assertTrue(tr._eof_written) 325 self.assertFalse(self.sock.shutdown.called) 326 tr._loop._proactor.send.assert_called_with(self.sock, b'data') 327 f.set_result(4) 328 self.loop._run_once() 329 self.sock.shutdown.assert_called_with(socket.SHUT_WR) 330 tr.close() 331 332 def test_write_eof_write_pipe(self): 333 tr = _ProactorWritePipeTransport( 334 self.loop, self.sock, self.protocol) 335 self.assertTrue(tr.can_write_eof()) 336 tr.write_eof() 337 self.assertTrue(tr.is_closing()) 338 self.loop._run_once() 339 self.assertTrue(self.sock.close.called) 340 tr.close() 341 342 def test_write_eof_buffer_write_pipe(self): 343 tr = _ProactorWritePipeTransport(self.loop, self.sock, self.protocol) 344 f = self.loop.create_future() 345 tr._loop._proactor.send.return_value = f 346 tr.write(b'data') 347 tr.write_eof() 348 self.assertTrue(tr.is_closing()) 349 self.assertFalse(self.sock.shutdown.called) 350 tr._loop._proactor.send.assert_called_with(self.sock, b'data') 351 f.set_result(4) 352 self.loop._run_once() 353 self.loop._run_once() 354 self.assertTrue(self.sock.close.called) 355 tr.close() 356 357 def test_write_eof_duplex_pipe(self): 358 tr = _ProactorDuplexPipeTransport( 359 self.loop, self.sock, self.protocol) 360 self.assertFalse(tr.can_write_eof()) 361 with self.assertRaises(NotImplementedError): 362 tr.write_eof() 363 close_transport(tr) 364 365 def test_pause_resume_reading(self): 366 tr = self.socket_transport() 367 index = 0 368 msgs = [b'data1', b'data2', b'data3', b'data4', b'data5', b''] 369 reversed_msgs = list(reversed(msgs)) 370 371 def recv_into(sock, data): 372 f = self.loop.create_future() 373 msg = reversed_msgs.pop() 374 375 result = f.result 376 def monkey(): 377 data[:len(msg)] = msg 378 return result() 379 f.result = monkey 380 381 f.set_result(len(msg)) 382 return f 383 384 self.loop._proactor.recv_into.side_effect = recv_into 385 self.loop._run_once() 386 self.assertFalse(tr._paused) 387 self.assertTrue(tr.is_reading()) 388 389 for msg in msgs[:2]: 390 self.loop._run_once() 391 self.protocol.data_received.assert_called_with(bytearray(msg)) 392 393 tr.pause_reading() 394 tr.pause_reading() 395 self.assertTrue(tr._paused) 396 self.assertFalse(tr.is_reading()) 397 for i in range(10): 398 self.loop._run_once() 399 self.protocol.data_received.assert_called_with(bytearray(msgs[1])) 400 401 tr.resume_reading() 402 tr.resume_reading() 403 self.assertFalse(tr._paused) 404 self.assertTrue(tr.is_reading()) 405 406 for msg in msgs[2:4]: 407 self.loop._run_once() 408 self.protocol.data_received.assert_called_with(bytearray(msg)) 409 410 tr.pause_reading() 411 tr.resume_reading() 412 self.loop.call_exception_handler = mock.Mock() 413 self.loop._run_once() 414 self.loop.call_exception_handler.assert_not_called() 415 self.protocol.data_received.assert_called_with(bytearray(msgs[4])) 416 tr.close() 417 418 self.assertFalse(tr.is_reading()) 419 420 421 def pause_writing_transport(self, high): 422 tr = self.socket_transport() 423 tr.set_write_buffer_limits(high=high) 424 425 self.assertEqual(tr.get_write_buffer_size(), 0) 426 self.assertFalse(self.protocol.pause_writing.called) 427 self.assertFalse(self.protocol.resume_writing.called) 428 return tr 429 430 def test_pause_resume_writing(self): 431 tr = self.pause_writing_transport(high=4) 432 433 # write a large chunk, must pause writing 434 fut = self.loop.create_future() 435 self.loop._proactor.send.return_value = fut 436 tr.write(b'large data') 437 self.loop._run_once() 438 self.assertTrue(self.protocol.pause_writing.called) 439 440 # flush the buffer 441 fut.set_result(None) 442 self.loop._run_once() 443 self.assertEqual(tr.get_write_buffer_size(), 0) 444 self.assertTrue(self.protocol.resume_writing.called) 445 446 def test_pause_writing_2write(self): 447 tr = self.pause_writing_transport(high=4) 448 449 # first short write, the buffer is not full (3 <= 4) 450 fut1 = self.loop.create_future() 451 self.loop._proactor.send.return_value = fut1 452 tr.write(b'123') 453 self.loop._run_once() 454 self.assertEqual(tr.get_write_buffer_size(), 3) 455 self.assertFalse(self.protocol.pause_writing.called) 456 457 # fill the buffer, must pause writing (6 > 4) 458 tr.write(b'abc') 459 self.loop._run_once() 460 self.assertEqual(tr.get_write_buffer_size(), 6) 461 self.assertTrue(self.protocol.pause_writing.called) 462 463 def test_pause_writing_3write(self): 464 tr = self.pause_writing_transport(high=4) 465 466 # first short write, the buffer is not full (1 <= 4) 467 fut = self.loop.create_future() 468 self.loop._proactor.send.return_value = fut 469 tr.write(b'1') 470 self.loop._run_once() 471 self.assertEqual(tr.get_write_buffer_size(), 1) 472 self.assertFalse(self.protocol.pause_writing.called) 473 474 # second short write, the buffer is not full (3 <= 4) 475 tr.write(b'23') 476 self.loop._run_once() 477 self.assertEqual(tr.get_write_buffer_size(), 3) 478 self.assertFalse(self.protocol.pause_writing.called) 479 480 # fill the buffer, must pause writing (6 > 4) 481 tr.write(b'abc') 482 self.loop._run_once() 483 self.assertEqual(tr.get_write_buffer_size(), 6) 484 self.assertTrue(self.protocol.pause_writing.called) 485 486 def test_dont_pause_writing(self): 487 tr = self.pause_writing_transport(high=4) 488 489 # write a large chunk which completes immediately, 490 # it should not pause writing 491 fut = self.loop.create_future() 492 fut.set_result(None) 493 self.loop._proactor.send.return_value = fut 494 tr.write(b'very large data') 495 self.loop._run_once() 496 self.assertEqual(tr.get_write_buffer_size(), 0) 497 self.assertFalse(self.protocol.pause_writing.called) 498 499 500class ProactorDatagramTransportTests(test_utils.TestCase): 501 502 def setUp(self): 503 super().setUp() 504 self.loop = self.new_test_loop() 505 self.proactor = mock.Mock() 506 self.loop._proactor = self.proactor 507 self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol) 508 self.sock = mock.Mock(spec_set=socket.socket) 509 self.sock.fileno.return_value = 7 510 511 def datagram_transport(self, address=None): 512 self.sock.getpeername.side_effect = None if address else OSError 513 transport = _ProactorDatagramTransport(self.loop, self.sock, 514 self.protocol, 515 address=address) 516 self.addCleanup(close_transport, transport) 517 return transport 518 519 def test_sendto(self): 520 data = b'data' 521 transport = self.datagram_transport() 522 transport.sendto(data, ('0.0.0.0', 1234)) 523 self.assertTrue(self.proactor.sendto.called) 524 self.proactor.sendto.assert_called_with( 525 self.sock, data, addr=('0.0.0.0', 1234)) 526 527 def test_sendto_bytearray(self): 528 data = bytearray(b'data') 529 transport = self.datagram_transport() 530 transport.sendto(data, ('0.0.0.0', 1234)) 531 self.assertTrue(self.proactor.sendto.called) 532 self.proactor.sendto.assert_called_with( 533 self.sock, b'data', addr=('0.0.0.0', 1234)) 534 535 def test_sendto_memoryview(self): 536 data = memoryview(b'data') 537 transport = self.datagram_transport() 538 transport.sendto(data, ('0.0.0.0', 1234)) 539 self.assertTrue(self.proactor.sendto.called) 540 self.proactor.sendto.assert_called_with( 541 self.sock, b'data', addr=('0.0.0.0', 1234)) 542 543 def test_sendto_no_data(self): 544 transport = self.datagram_transport() 545 transport._buffer.append((b'data', ('0.0.0.0', 12345))) 546 transport.sendto(b'', ()) 547 self.assertFalse(self.sock.sendto.called) 548 self.assertEqual( 549 [(b'data', ('0.0.0.0', 12345))], list(transport._buffer)) 550 551 def test_sendto_buffer(self): 552 transport = self.datagram_transport() 553 transport._buffer.append((b'data1', ('0.0.0.0', 12345))) 554 transport._write_fut = object() 555 transport.sendto(b'data2', ('0.0.0.0', 12345)) 556 self.assertFalse(self.proactor.sendto.called) 557 self.assertEqual( 558 [(b'data1', ('0.0.0.0', 12345)), 559 (b'data2', ('0.0.0.0', 12345))], 560 list(transport._buffer)) 561 562 def test_sendto_buffer_bytearray(self): 563 data2 = bytearray(b'data2') 564 transport = self.datagram_transport() 565 transport._buffer.append((b'data1', ('0.0.0.0', 12345))) 566 transport._write_fut = object() 567 transport.sendto(data2, ('0.0.0.0', 12345)) 568 self.assertFalse(self.proactor.sendto.called) 569 self.assertEqual( 570 [(b'data1', ('0.0.0.0', 12345)), 571 (b'data2', ('0.0.0.0', 12345))], 572 list(transport._buffer)) 573 self.assertIsInstance(transport._buffer[1][0], bytes) 574 575 def test_sendto_buffer_memoryview(self): 576 data2 = memoryview(b'data2') 577 transport = self.datagram_transport() 578 transport._buffer.append((b'data1', ('0.0.0.0', 12345))) 579 transport._write_fut = object() 580 transport.sendto(data2, ('0.0.0.0', 12345)) 581 self.assertFalse(self.proactor.sendto.called) 582 self.assertEqual( 583 [(b'data1', ('0.0.0.0', 12345)), 584 (b'data2', ('0.0.0.0', 12345))], 585 list(transport._buffer)) 586 self.assertIsInstance(transport._buffer[1][0], bytes) 587 588 @mock.patch('asyncio.proactor_events.logger') 589 def test_sendto_exception(self, m_log): 590 data = b'data' 591 err = self.proactor.sendto.side_effect = RuntimeError() 592 593 transport = self.datagram_transport() 594 transport._fatal_error = mock.Mock() 595 transport.sendto(data, ()) 596 597 self.assertTrue(transport._fatal_error.called) 598 transport._fatal_error.assert_called_with( 599 err, 600 'Fatal write error on datagram transport') 601 transport._conn_lost = 1 602 603 transport._address = ('123',) 604 transport.sendto(data) 605 transport.sendto(data) 606 transport.sendto(data) 607 transport.sendto(data) 608 transport.sendto(data) 609 m_log.warning.assert_called_with('socket.sendto() raised exception.') 610 611 def test_sendto_error_received(self): 612 data = b'data' 613 614 self.sock.sendto.side_effect = ConnectionRefusedError 615 616 transport = self.datagram_transport() 617 transport._fatal_error = mock.Mock() 618 transport.sendto(data, ()) 619 620 self.assertEqual(transport._conn_lost, 0) 621 self.assertFalse(transport._fatal_error.called) 622 623 def test_sendto_error_received_connected(self): 624 data = b'data' 625 626 self.proactor.send.side_effect = ConnectionRefusedError 627 628 transport = self.datagram_transport(address=('0.0.0.0', 1)) 629 transport._fatal_error = mock.Mock() 630 transport.sendto(data) 631 632 self.assertFalse(transport._fatal_error.called) 633 self.assertTrue(self.protocol.error_received.called) 634 635 def test_sendto_str(self): 636 transport = self.datagram_transport() 637 self.assertRaises(TypeError, transport.sendto, 'str', ()) 638 639 def test_sendto_connected_addr(self): 640 transport = self.datagram_transport(address=('0.0.0.0', 1)) 641 self.assertRaises( 642 ValueError, transport.sendto, b'str', ('0.0.0.0', 2)) 643 644 def test_sendto_closing(self): 645 transport = self.datagram_transport(address=(1,)) 646 transport.close() 647 self.assertEqual(transport._conn_lost, 1) 648 transport.sendto(b'data', (1,)) 649 self.assertEqual(transport._conn_lost, 2) 650 651 def test__loop_writing_closing(self): 652 transport = self.datagram_transport() 653 transport._closing = True 654 transport._loop_writing() 655 self.assertIsNone(transport._write_fut) 656 test_utils.run_briefly(self.loop) 657 self.sock.close.assert_called_with() 658 self.protocol.connection_lost.assert_called_with(None) 659 660 def test__loop_writing_exception(self): 661 err = self.proactor.sendto.side_effect = RuntimeError() 662 663 transport = self.datagram_transport() 664 transport._fatal_error = mock.Mock() 665 transport._buffer.append((b'data', ())) 666 transport._loop_writing() 667 668 transport._fatal_error.assert_called_with( 669 err, 670 'Fatal write error on datagram transport') 671 672 def test__loop_writing_error_received(self): 673 self.proactor.sendto.side_effect = ConnectionRefusedError 674 675 transport = self.datagram_transport() 676 transport._fatal_error = mock.Mock() 677 transport._buffer.append((b'data', ())) 678 transport._loop_writing() 679 680 self.assertFalse(transport._fatal_error.called) 681 682 def test__loop_writing_error_received_connection(self): 683 self.proactor.send.side_effect = ConnectionRefusedError 684 685 transport = self.datagram_transport(address=('0.0.0.0', 1)) 686 transport._fatal_error = mock.Mock() 687 transport._buffer.append((b'data', ())) 688 transport._loop_writing() 689 690 self.assertFalse(transport._fatal_error.called) 691 self.assertTrue(self.protocol.error_received.called) 692 693 @mock.patch('asyncio.base_events.logger.error') 694 def test_fatal_error_connected(self, m_exc): 695 transport = self.datagram_transport(address=('0.0.0.0', 1)) 696 err = ConnectionRefusedError() 697 transport._fatal_error(err) 698 self.assertFalse(self.protocol.error_received.called) 699 m_exc.assert_not_called() 700 701 702class BaseProactorEventLoopTests(test_utils.TestCase): 703 704 def setUp(self): 705 super().setUp() 706 707 self.sock = test_utils.mock_nonblocking_socket() 708 self.proactor = mock.Mock() 709 710 self.ssock, self.csock = mock.Mock(), mock.Mock() 711 712 with mock.patch('asyncio.proactor_events.socket.socketpair', 713 return_value=(self.ssock, self.csock)): 714 with mock.patch('signal.set_wakeup_fd'): 715 self.loop = BaseProactorEventLoop(self.proactor) 716 self.set_event_loop(self.loop) 717 718 @mock.patch('asyncio.proactor_events.socket.socketpair') 719 def test_ctor(self, socketpair): 720 ssock, csock = socketpair.return_value = ( 721 mock.Mock(), mock.Mock()) 722 with mock.patch('signal.set_wakeup_fd'): 723 loop = BaseProactorEventLoop(self.proactor) 724 self.assertIs(loop._ssock, ssock) 725 self.assertIs(loop._csock, csock) 726 self.assertEqual(loop._internal_fds, 1) 727 loop.close() 728 729 def test_close_self_pipe(self): 730 self.loop._close_self_pipe() 731 self.assertEqual(self.loop._internal_fds, 0) 732 self.assertTrue(self.ssock.close.called) 733 self.assertTrue(self.csock.close.called) 734 self.assertIsNone(self.loop._ssock) 735 self.assertIsNone(self.loop._csock) 736 737 # Don't call close(): _close_self_pipe() cannot be called twice 738 self.loop._closed = True 739 740 def test_close(self): 741 self.loop._close_self_pipe = mock.Mock() 742 self.loop.close() 743 self.assertTrue(self.loop._close_self_pipe.called) 744 self.assertTrue(self.proactor.close.called) 745 self.assertIsNone(self.loop._proactor) 746 747 self.loop._close_self_pipe.reset_mock() 748 self.loop.close() 749 self.assertFalse(self.loop._close_self_pipe.called) 750 751 def test_make_socket_transport(self): 752 tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol()) 753 self.assertIsInstance(tr, _ProactorSocketTransport) 754 close_transport(tr) 755 756 def test_loop_self_reading(self): 757 self.loop._loop_self_reading() 758 self.proactor.recv.assert_called_with(self.ssock, 4096) 759 self.proactor.recv.return_value.add_done_callback.assert_called_with( 760 self.loop._loop_self_reading) 761 762 def test_loop_self_reading_fut(self): 763 fut = mock.Mock() 764 self.loop._self_reading_future = fut 765 self.loop._loop_self_reading(fut) 766 self.assertTrue(fut.result.called) 767 self.proactor.recv.assert_called_with(self.ssock, 4096) 768 self.proactor.recv.return_value.add_done_callback.assert_called_with( 769 self.loop._loop_self_reading) 770 771 def test_loop_self_reading_exception(self): 772 self.loop.call_exception_handler = mock.Mock() 773 self.proactor.recv.side_effect = OSError() 774 self.loop._loop_self_reading() 775 self.assertTrue(self.loop.call_exception_handler.called) 776 777 def test_write_to_self(self): 778 self.loop._write_to_self() 779 self.csock.send.assert_called_with(b'\0') 780 781 def test_process_events(self): 782 self.loop._process_events([]) 783 784 @mock.patch('asyncio.base_events.logger') 785 def test_create_server(self, m_log): 786 pf = mock.Mock() 787 call_soon = self.loop.call_soon = mock.Mock() 788 789 self.loop._start_serving(pf, self.sock) 790 self.assertTrue(call_soon.called) 791 792 # callback 793 loop = call_soon.call_args[0][0] 794 loop() 795 self.proactor.accept.assert_called_with(self.sock) 796 797 # conn 798 fut = mock.Mock() 799 fut.result.return_value = (mock.Mock(), mock.Mock()) 800 801 make_tr = self.loop._make_socket_transport = mock.Mock() 802 loop(fut) 803 self.assertTrue(fut.result.called) 804 self.assertTrue(make_tr.called) 805 806 # exception 807 fut.result.side_effect = OSError() 808 loop(fut) 809 self.assertTrue(self.sock.close.called) 810 self.assertTrue(m_log.error.called) 811 812 def test_create_server_cancel(self): 813 pf = mock.Mock() 814 call_soon = self.loop.call_soon = mock.Mock() 815 816 self.loop._start_serving(pf, self.sock) 817 loop = call_soon.call_args[0][0] 818 819 # cancelled 820 fut = self.loop.create_future() 821 fut.cancel() 822 loop(fut) 823 self.assertTrue(self.sock.close.called) 824 825 def test_stop_serving(self): 826 sock1 = mock.Mock() 827 future1 = mock.Mock() 828 sock2 = mock.Mock() 829 future2 = mock.Mock() 830 self.loop._accept_futures = { 831 sock1.fileno(): future1, 832 sock2.fileno(): future2 833 } 834 835 self.loop._stop_serving(sock1) 836 self.assertTrue(sock1.close.called) 837 self.assertTrue(future1.cancel.called) 838 self.proactor._stop_serving.assert_called_with(sock1) 839 self.assertFalse(sock2.close.called) 840 self.assertFalse(future2.cancel.called) 841 842 def datagram_transport(self): 843 self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol) 844 return self.loop._make_datagram_transport(self.sock, self.protocol) 845 846 def test_make_datagram_transport(self): 847 tr = self.datagram_transport() 848 self.assertIsInstance(tr, _ProactorDatagramTransport) 849 self.assertIsInstance(tr, asyncio.DatagramTransport) 850 close_transport(tr) 851 852 def test_datagram_loop_writing(self): 853 tr = self.datagram_transport() 854 tr._buffer.appendleft((b'data', ('127.0.0.1', 12068))) 855 tr._loop_writing() 856 self.loop._proactor.sendto.assert_called_with(self.sock, b'data', addr=('127.0.0.1', 12068)) 857 self.loop._proactor.sendto.return_value.add_done_callback.\ 858 assert_called_with(tr._loop_writing) 859 860 close_transport(tr) 861 862 def test_datagram_loop_reading(self): 863 tr = self.datagram_transport() 864 tr._loop_reading() 865 self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024) 866 self.assertFalse(self.protocol.datagram_received.called) 867 self.assertFalse(self.protocol.error_received.called) 868 close_transport(tr) 869 870 def test_datagram_loop_reading_data(self): 871 res = self.loop.create_future() 872 res.set_result((b'data', ('127.0.0.1', 12068))) 873 874 tr = self.datagram_transport() 875 tr._read_fut = res 876 tr._loop_reading(res) 877 self.loop._proactor.recvfrom.assert_called_with(self.sock, 256 * 1024) 878 self.protocol.datagram_received.assert_called_with(b'data', ('127.0.0.1', 12068)) 879 close_transport(tr) 880 881 def test_datagram_loop_reading_no_data(self): 882 res = self.loop.create_future() 883 res.set_result((b'', ('127.0.0.1', 12068))) 884 885 tr = self.datagram_transport() 886 self.assertRaises(AssertionError, tr._loop_reading, res) 887 888 tr.close = mock.Mock() 889 tr._read_fut = res 890 tr._loop_reading(res) 891 self.assertTrue(self.loop._proactor.recvfrom.called) 892 self.assertFalse(self.protocol.error_received.called) 893 self.assertFalse(tr.close.called) 894 close_transport(tr) 895 896 def test_datagram_loop_reading_aborted(self): 897 err = self.loop._proactor.recvfrom.side_effect = ConnectionAbortedError() 898 899 tr = self.datagram_transport() 900 tr._fatal_error = mock.Mock() 901 tr._protocol.error_received = mock.Mock() 902 tr._loop_reading() 903 tr._protocol.error_received.assert_called_with(err) 904 close_transport(tr) 905 906 def test_datagram_loop_writing_aborted(self): 907 err = self.loop._proactor.sendto.side_effect = ConnectionAbortedError() 908 909 tr = self.datagram_transport() 910 tr._fatal_error = mock.Mock() 911 tr._protocol.error_received = mock.Mock() 912 tr._buffer.appendleft((b'Hello', ('127.0.0.1', 12068))) 913 tr._loop_writing() 914 tr._protocol.error_received.assert_called_with(err) 915 close_transport(tr) 916 917 918@unittest.skipIf(sys.platform != 'win32', 919 'Proactor is supported on Windows only') 920class ProactorEventLoopUnixSockSendfileTests(test_utils.TestCase): 921 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 922 923 class MyProto(asyncio.Protocol): 924 925 def __init__(self, loop): 926 self.started = False 927 self.closed = False 928 self.data = bytearray() 929 self.fut = loop.create_future() 930 self.transport = None 931 932 def connection_made(self, transport): 933 self.started = True 934 self.transport = transport 935 936 def data_received(self, data): 937 self.data.extend(data) 938 939 def connection_lost(self, exc): 940 self.closed = True 941 self.fut.set_result(None) 942 943 async def wait_closed(self): 944 await self.fut 945 946 @classmethod 947 def setUpClass(cls): 948 with open(os_helper.TESTFN, 'wb') as fp: 949 fp.write(cls.DATA) 950 super().setUpClass() 951 952 @classmethod 953 def tearDownClass(cls): 954 os_helper.unlink(os_helper.TESTFN) 955 super().tearDownClass() 956 957 def setUp(self): 958 self.loop = asyncio.ProactorEventLoop() 959 self.set_event_loop(self.loop) 960 self.addCleanup(self.loop.close) 961 self.file = open(os_helper.TESTFN, 'rb') 962 self.addCleanup(self.file.close) 963 super().setUp() 964 965 def make_socket(self, cleanup=True): 966 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 967 sock.setblocking(False) 968 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024) 969 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024) 970 if cleanup: 971 self.addCleanup(sock.close) 972 return sock 973 974 def run_loop(self, coro): 975 return self.loop.run_until_complete(coro) 976 977 def prepare(self): 978 sock = self.make_socket() 979 proto = self.MyProto(self.loop) 980 port = socket_helper.find_unused_port() 981 srv_sock = self.make_socket(cleanup=False) 982 srv_sock.bind(('127.0.0.1', port)) 983 server = self.run_loop(self.loop.create_server( 984 lambda: proto, sock=srv_sock)) 985 self.run_loop(self.loop.sock_connect(sock, srv_sock.getsockname())) 986 987 def cleanup(): 988 if proto.transport is not None: 989 # can be None if the task was cancelled before 990 # connection_made callback 991 proto.transport.close() 992 self.run_loop(proto.wait_closed()) 993 994 server.close() 995 self.run_loop(server.wait_closed()) 996 997 self.addCleanup(cleanup) 998 999 return sock, proto 1000 1001 def test_sock_sendfile_not_a_file(self): 1002 sock, proto = self.prepare() 1003 f = object() 1004 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 1005 "not a regular file"): 1006 self.run_loop(self.loop._sock_sendfile_native(sock, f, 1007 0, None)) 1008 self.assertEqual(self.file.tell(), 0) 1009 1010 def test_sock_sendfile_iobuffer(self): 1011 sock, proto = self.prepare() 1012 f = io.BytesIO() 1013 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 1014 "not a regular file"): 1015 self.run_loop(self.loop._sock_sendfile_native(sock, f, 1016 0, None)) 1017 self.assertEqual(self.file.tell(), 0) 1018 1019 def test_sock_sendfile_not_regular_file(self): 1020 sock, proto = self.prepare() 1021 f = mock.Mock() 1022 f.fileno.return_value = -1 1023 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 1024 "not a regular file"): 1025 self.run_loop(self.loop._sock_sendfile_native(sock, f, 1026 0, None)) 1027 self.assertEqual(self.file.tell(), 0) 1028 1029 1030if __name__ == '__main__': 1031 unittest.main() 1032