1"""Tests for base_events.py""" 2 3import errno 4import logging 5import math 6import os 7import socket 8import sys 9import threading 10import time 11import unittest 12from unittest import mock 13 14import asyncio 15from asyncio import base_events 16from asyncio import constants 17from asyncio import events 18from test.test_asyncio import utils as test_utils 19from test import support 20from test.support.script_helper import assert_python_ok 21 22 23MOCK_ANY = mock.ANY 24PY34 = sys.version_info >= (3, 4) 25 26 27def mock_socket_module(): 28 m_socket = mock.MagicMock(spec=socket) 29 for name in ( 30 'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP', 31 'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton' 32 ): 33 if hasattr(socket, name): 34 setattr(m_socket, name, getattr(socket, name)) 35 else: 36 delattr(m_socket, name) 37 38 m_socket.socket = mock.MagicMock() 39 m_socket.socket.return_value = test_utils.mock_nonblocking_socket() 40 m_socket.getaddrinfo._is_coroutine = False 41 42 return m_socket 43 44 45def patch_socket(f): 46 return mock.patch('asyncio.base_events.socket', 47 new_callable=mock_socket_module)(f) 48 49 50class BaseEventTests(test_utils.TestCase): 51 52 def test_ipaddr_info(self): 53 UNSPEC = socket.AF_UNSPEC 54 INET = socket.AF_INET 55 INET6 = socket.AF_INET6 56 STREAM = socket.SOCK_STREAM 57 DGRAM = socket.SOCK_DGRAM 58 TCP = socket.IPPROTO_TCP 59 UDP = socket.IPPROTO_UDP 60 61 self.assertEqual( 62 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 63 base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP)) 64 65 self.assertEqual( 66 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 67 base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP)) 68 69 self.assertEqual( 70 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 71 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP)) 72 73 self.assertEqual( 74 (INET, DGRAM, UDP, '', ('1.2.3.4', 1)), 75 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP)) 76 77 # Socket type STREAM implies TCP protocol. 78 self.assertEqual( 79 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 80 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0)) 81 82 # Socket type DGRAM implies UDP protocol. 83 self.assertEqual( 84 (INET, DGRAM, UDP, '', ('1.2.3.4', 1)), 85 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0)) 86 87 # No socket type. 88 self.assertIsNone( 89 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0)) 90 91 # IPv4 address with family IPv6. 92 self.assertIsNone( 93 base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP)) 94 95 self.assertEqual( 96 (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)), 97 base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP)) 98 99 self.assertEqual( 100 (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)), 101 base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP)) 102 103 # IPv6 address with family IPv4. 104 self.assertIsNone( 105 base_events._ipaddr_info('::3', 1, INET, STREAM, TCP)) 106 107 # IPv6 address with zone index. 108 self.assertIsNone( 109 base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP)) 110 111 def test_port_parameter_types(self): 112 # Test obscure kinds of arguments for "port". 113 INET = socket.AF_INET 114 STREAM = socket.SOCK_STREAM 115 TCP = socket.IPPROTO_TCP 116 117 self.assertEqual( 118 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 119 base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP)) 120 121 self.assertEqual( 122 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 123 base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP)) 124 125 self.assertEqual( 126 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 127 base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP)) 128 129 self.assertEqual( 130 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 131 base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP)) 132 133 self.assertEqual( 134 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 135 base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP)) 136 137 @patch_socket 138 def test_ipaddr_info_no_inet_pton(self, m_socket): 139 del m_socket.inet_pton 140 self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1, 141 socket.AF_INET, 142 socket.SOCK_STREAM, 143 socket.IPPROTO_TCP)) 144 145 146class BaseEventLoopTests(test_utils.TestCase): 147 148 def setUp(self): 149 super().setUp() 150 self.loop = base_events.BaseEventLoop() 151 self.loop._selector = mock.Mock() 152 self.loop._selector.select.return_value = () 153 self.set_event_loop(self.loop) 154 155 def test_not_implemented(self): 156 m = mock.Mock() 157 self.assertRaises( 158 NotImplementedError, 159 self.loop._make_socket_transport, m, m) 160 self.assertRaises( 161 NotImplementedError, 162 self.loop._make_ssl_transport, m, m, m, m) 163 self.assertRaises( 164 NotImplementedError, 165 self.loop._make_datagram_transport, m, m) 166 self.assertRaises( 167 NotImplementedError, self.loop._process_events, []) 168 self.assertRaises( 169 NotImplementedError, self.loop._write_to_self) 170 self.assertRaises( 171 NotImplementedError, 172 self.loop._make_read_pipe_transport, m, m) 173 self.assertRaises( 174 NotImplementedError, 175 self.loop._make_write_pipe_transport, m, m) 176 gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m) 177 with self.assertRaises(NotImplementedError): 178 gen.send(None) 179 180 def test_close(self): 181 self.assertFalse(self.loop.is_closed()) 182 self.loop.close() 183 self.assertTrue(self.loop.is_closed()) 184 185 # it should be possible to call close() more than once 186 self.loop.close() 187 self.loop.close() 188 189 # operation blocked when the loop is closed 190 f = asyncio.Future(loop=self.loop) 191 self.assertRaises(RuntimeError, self.loop.run_forever) 192 self.assertRaises(RuntimeError, self.loop.run_until_complete, f) 193 194 def test__add_callback_handle(self): 195 h = asyncio.Handle(lambda: False, (), self.loop, None) 196 197 self.loop._add_callback(h) 198 self.assertFalse(self.loop._scheduled) 199 self.assertIn(h, self.loop._ready) 200 201 def test__add_callback_cancelled_handle(self): 202 h = asyncio.Handle(lambda: False, (), self.loop, None) 203 h.cancel() 204 205 self.loop._add_callback(h) 206 self.assertFalse(self.loop._scheduled) 207 self.assertFalse(self.loop._ready) 208 209 def test_set_default_executor(self): 210 executor = mock.Mock() 211 self.loop.set_default_executor(executor) 212 self.assertIs(executor, self.loop._default_executor) 213 214 def test_call_soon(self): 215 def cb(): 216 pass 217 218 h = self.loop.call_soon(cb) 219 self.assertEqual(h._callback, cb) 220 self.assertIsInstance(h, asyncio.Handle) 221 self.assertIn(h, self.loop._ready) 222 223 def test_call_soon_non_callable(self): 224 self.loop.set_debug(True) 225 with self.assertRaisesRegex(TypeError, 'a callable object'): 226 self.loop.call_soon(1) 227 228 def test_call_later(self): 229 def cb(): 230 pass 231 232 h = self.loop.call_later(10.0, cb) 233 self.assertIsInstance(h, asyncio.TimerHandle) 234 self.assertIn(h, self.loop._scheduled) 235 self.assertNotIn(h, self.loop._ready) 236 237 def test_call_later_negative_delays(self): 238 calls = [] 239 240 def cb(arg): 241 calls.append(arg) 242 243 self.loop._process_events = mock.Mock() 244 self.loop.call_later(-1, cb, 'a') 245 self.loop.call_later(-2, cb, 'b') 246 test_utils.run_briefly(self.loop) 247 self.assertEqual(calls, ['b', 'a']) 248 249 def test_time_and_call_at(self): 250 def cb(): 251 self.loop.stop() 252 253 self.loop._process_events = mock.Mock() 254 delay = 0.1 255 256 when = self.loop.time() + delay 257 self.loop.call_at(when, cb) 258 t0 = self.loop.time() 259 self.loop.run_forever() 260 dt = self.loop.time() - t0 261 262 # 50 ms: maximum granularity of the event loop 263 self.assertGreaterEqual(dt, delay - 0.050, dt) 264 # tolerate a difference of +800 ms because some Python buildbots 265 # are really slow 266 self.assertLessEqual(dt, 0.9, dt) 267 268 def check_thread(self, loop, debug): 269 def cb(): 270 pass 271 272 loop.set_debug(debug) 273 if debug: 274 msg = ("Non-thread-safe operation invoked on an event loop other " 275 "than the current one") 276 with self.assertRaisesRegex(RuntimeError, msg): 277 loop.call_soon(cb) 278 with self.assertRaisesRegex(RuntimeError, msg): 279 loop.call_later(60, cb) 280 with self.assertRaisesRegex(RuntimeError, msg): 281 loop.call_at(loop.time() + 60, cb) 282 else: 283 loop.call_soon(cb) 284 loop.call_later(60, cb) 285 loop.call_at(loop.time() + 60, cb) 286 287 def test_check_thread(self): 288 def check_in_thread(loop, event, debug, create_loop, fut): 289 # wait until the event loop is running 290 event.wait() 291 292 try: 293 if create_loop: 294 loop2 = base_events.BaseEventLoop() 295 try: 296 asyncio.set_event_loop(loop2) 297 self.check_thread(loop, debug) 298 finally: 299 asyncio.set_event_loop(None) 300 loop2.close() 301 else: 302 self.check_thread(loop, debug) 303 except Exception as exc: 304 loop.call_soon_threadsafe(fut.set_exception, exc) 305 else: 306 loop.call_soon_threadsafe(fut.set_result, None) 307 308 def test_thread(loop, debug, create_loop=False): 309 event = threading.Event() 310 fut = asyncio.Future(loop=loop) 311 loop.call_soon(event.set) 312 args = (loop, event, debug, create_loop, fut) 313 thread = threading.Thread(target=check_in_thread, args=args) 314 thread.start() 315 loop.run_until_complete(fut) 316 thread.join() 317 318 self.loop._process_events = mock.Mock() 319 self.loop._write_to_self = mock.Mock() 320 321 # raise RuntimeError if the thread has no event loop 322 test_thread(self.loop, True) 323 324 # check disabled if debug mode is disabled 325 test_thread(self.loop, False) 326 327 # raise RuntimeError if the event loop of the thread is not the called 328 # event loop 329 test_thread(self.loop, True, create_loop=True) 330 331 # check disabled if debug mode is disabled 332 test_thread(self.loop, False, create_loop=True) 333 334 def test__run_once(self): 335 h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (), 336 self.loop, None) 337 h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (), 338 self.loop, None) 339 340 h1.cancel() 341 342 self.loop._process_events = mock.Mock() 343 self.loop._scheduled.append(h1) 344 self.loop._scheduled.append(h2) 345 self.loop._run_once() 346 347 t = self.loop._selector.select.call_args[0][0] 348 self.assertTrue(9.5 < t < 10.5, t) 349 self.assertEqual([h2], self.loop._scheduled) 350 self.assertTrue(self.loop._process_events.called) 351 352 def test_set_debug(self): 353 self.loop.set_debug(True) 354 self.assertTrue(self.loop.get_debug()) 355 self.loop.set_debug(False) 356 self.assertFalse(self.loop.get_debug()) 357 358 @mock.patch('asyncio.base_events.logger') 359 def test__run_once_logging(self, m_logger): 360 def slow_select(timeout): 361 # Sleep a bit longer than a second to avoid timer resolution 362 # issues. 363 time.sleep(1.1) 364 return [] 365 366 # logging needs debug flag 367 self.loop.set_debug(True) 368 369 # Log to INFO level if timeout > 1.0 sec. 370 self.loop._selector.select = slow_select 371 self.loop._process_events = mock.Mock() 372 self.loop._run_once() 373 self.assertEqual(logging.INFO, m_logger.log.call_args[0][0]) 374 375 def fast_select(timeout): 376 time.sleep(0.001) 377 return [] 378 379 self.loop._selector.select = fast_select 380 self.loop._run_once() 381 self.assertEqual(logging.DEBUG, m_logger.log.call_args[0][0]) 382 383 def test__run_once_schedule_handle(self): 384 handle = None 385 processed = False 386 387 def cb(loop): 388 nonlocal processed, handle 389 processed = True 390 handle = loop.call_soon(lambda: True) 391 392 h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,), 393 self.loop, None) 394 395 self.loop._process_events = mock.Mock() 396 self.loop._scheduled.append(h) 397 self.loop._run_once() 398 399 self.assertTrue(processed) 400 self.assertEqual([handle], list(self.loop._ready)) 401 402 def test__run_once_cancelled_event_cleanup(self): 403 self.loop._process_events = mock.Mock() 404 405 self.assertTrue( 406 0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0) 407 408 def cb(): 409 pass 410 411 # Set up one "blocking" event that will not be cancelled to 412 # ensure later cancelled events do not make it to the head 413 # of the queue and get cleaned. 414 not_cancelled_count = 1 415 self.loop.call_later(3000, cb) 416 417 # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES) 418 # cancelled handles, ensure they aren't removed 419 420 cancelled_count = 2 421 for x in range(2): 422 h = self.loop.call_later(3600, cb) 423 h.cancel() 424 425 # Add some cancelled events that will be at head and removed 426 cancelled_count += 2 427 for x in range(2): 428 h = self.loop.call_later(100, cb) 429 h.cancel() 430 431 # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low 432 self.assertLessEqual(cancelled_count + not_cancelled_count, 433 base_events._MIN_SCHEDULED_TIMER_HANDLES) 434 435 self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) 436 437 self.loop._run_once() 438 439 cancelled_count -= 2 440 441 self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) 442 443 self.assertEqual(len(self.loop._scheduled), 444 cancelled_count + not_cancelled_count) 445 446 # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION 447 # so that deletion of cancelled events will occur on next _run_once 448 add_cancel_count = int(math.ceil( 449 base_events._MIN_SCHEDULED_TIMER_HANDLES * 450 base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1 451 452 add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES - 453 add_cancel_count, 0) 454 455 # Add some events that will not be cancelled 456 not_cancelled_count += add_not_cancel_count 457 for x in range(add_not_cancel_count): 458 self.loop.call_later(3600, cb) 459 460 # Add enough cancelled events 461 cancelled_count += add_cancel_count 462 for x in range(add_cancel_count): 463 h = self.loop.call_later(3600, cb) 464 h.cancel() 465 466 # Ensure all handles are still scheduled 467 self.assertEqual(len(self.loop._scheduled), 468 cancelled_count + not_cancelled_count) 469 470 self.loop._run_once() 471 472 # Ensure cancelled events were removed 473 self.assertEqual(len(self.loop._scheduled), not_cancelled_count) 474 475 # Ensure only uncancelled events remain scheduled 476 self.assertTrue(all([not x._cancelled for x in self.loop._scheduled])) 477 478 def test_run_until_complete_type_error(self): 479 self.assertRaises(TypeError, 480 self.loop.run_until_complete, 'blah') 481 482 def test_run_until_complete_loop(self): 483 task = asyncio.Future(loop=self.loop) 484 other_loop = self.new_test_loop() 485 self.addCleanup(other_loop.close) 486 self.assertRaises(ValueError, 487 other_loop.run_until_complete, task) 488 489 def test_run_until_complete_loop_orphan_future_close_loop(self): 490 class ShowStopper(BaseException): 491 pass 492 493 async def foo(delay): 494 await asyncio.sleep(delay, loop=self.loop) 495 496 def throw(): 497 raise ShowStopper 498 499 self.loop._process_events = mock.Mock() 500 self.loop.call_soon(throw) 501 try: 502 self.loop.run_until_complete(foo(0.1)) 503 except ShowStopper: 504 pass 505 506 # This call fails if run_until_complete does not clean up 507 # done-callback for the previous future. 508 self.loop.run_until_complete(foo(0.2)) 509 510 def test_subprocess_exec_invalid_args(self): 511 args = [sys.executable, '-c', 'pass'] 512 513 # missing program parameter (empty args) 514 self.assertRaises(TypeError, 515 self.loop.run_until_complete, self.loop.subprocess_exec, 516 asyncio.SubprocessProtocol) 517 518 # expected multiple arguments, not a list 519 self.assertRaises(TypeError, 520 self.loop.run_until_complete, self.loop.subprocess_exec, 521 asyncio.SubprocessProtocol, args) 522 523 # program arguments must be strings, not int 524 self.assertRaises(TypeError, 525 self.loop.run_until_complete, self.loop.subprocess_exec, 526 asyncio.SubprocessProtocol, sys.executable, 123) 527 528 # universal_newlines, shell, bufsize must not be set 529 self.assertRaises(TypeError, 530 self.loop.run_until_complete, self.loop.subprocess_exec, 531 asyncio.SubprocessProtocol, *args, universal_newlines=True) 532 self.assertRaises(TypeError, 533 self.loop.run_until_complete, self.loop.subprocess_exec, 534 asyncio.SubprocessProtocol, *args, shell=True) 535 self.assertRaises(TypeError, 536 self.loop.run_until_complete, self.loop.subprocess_exec, 537 asyncio.SubprocessProtocol, *args, bufsize=4096) 538 539 def test_subprocess_shell_invalid_args(self): 540 # expected a string, not an int or a list 541 self.assertRaises(TypeError, 542 self.loop.run_until_complete, self.loop.subprocess_shell, 543 asyncio.SubprocessProtocol, 123) 544 self.assertRaises(TypeError, 545 self.loop.run_until_complete, self.loop.subprocess_shell, 546 asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass']) 547 548 # universal_newlines, shell, bufsize must not be set 549 self.assertRaises(TypeError, 550 self.loop.run_until_complete, self.loop.subprocess_shell, 551 asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True) 552 self.assertRaises(TypeError, 553 self.loop.run_until_complete, self.loop.subprocess_shell, 554 asyncio.SubprocessProtocol, 'exit 0', shell=True) 555 self.assertRaises(TypeError, 556 self.loop.run_until_complete, self.loop.subprocess_shell, 557 asyncio.SubprocessProtocol, 'exit 0', bufsize=4096) 558 559 def test_default_exc_handler_callback(self): 560 self.loop._process_events = mock.Mock() 561 562 def zero_error(fut): 563 fut.set_result(True) 564 1/0 565 566 # Test call_soon (events.Handle) 567 with mock.patch('asyncio.base_events.logger') as log: 568 fut = asyncio.Future(loop=self.loop) 569 self.loop.call_soon(zero_error, fut) 570 fut.add_done_callback(lambda fut: self.loop.stop()) 571 self.loop.run_forever() 572 log.error.assert_called_with( 573 test_utils.MockPattern('Exception in callback.*zero'), 574 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 575 576 # Test call_later (events.TimerHandle) 577 with mock.patch('asyncio.base_events.logger') as log: 578 fut = asyncio.Future(loop=self.loop) 579 self.loop.call_later(0.01, zero_error, fut) 580 fut.add_done_callback(lambda fut: self.loop.stop()) 581 self.loop.run_forever() 582 log.error.assert_called_with( 583 test_utils.MockPattern('Exception in callback.*zero'), 584 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 585 586 def test_default_exc_handler_coro(self): 587 self.loop._process_events = mock.Mock() 588 589 @asyncio.coroutine 590 def zero_error_coro(): 591 yield from asyncio.sleep(0.01, loop=self.loop) 592 1/0 593 594 # Test Future.__del__ 595 with mock.patch('asyncio.base_events.logger') as log: 596 fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop) 597 fut.add_done_callback(lambda *args: self.loop.stop()) 598 self.loop.run_forever() 599 fut = None # Trigger Future.__del__ or futures._TracebackLogger 600 support.gc_collect() 601 if PY34: 602 # Future.__del__ in Python 3.4 logs error with 603 # an actual exception context 604 log.error.assert_called_with( 605 test_utils.MockPattern('.*exception was never retrieved'), 606 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 607 else: 608 # futures._TracebackLogger logs only textual traceback 609 log.error.assert_called_with( 610 test_utils.MockPattern( 611 '.*exception was never retrieved.*ZeroDiv'), 612 exc_info=False) 613 614 def test_set_exc_handler_invalid(self): 615 with self.assertRaisesRegex(TypeError, 'A callable object or None'): 616 self.loop.set_exception_handler('spam') 617 618 def test_set_exc_handler_custom(self): 619 def zero_error(): 620 1/0 621 622 def run_loop(): 623 handle = self.loop.call_soon(zero_error) 624 self.loop._run_once() 625 return handle 626 627 self.loop.set_debug(True) 628 self.loop._process_events = mock.Mock() 629 630 self.assertIsNone(self.loop.get_exception_handler()) 631 mock_handler = mock.Mock() 632 self.loop.set_exception_handler(mock_handler) 633 self.assertIs(self.loop.get_exception_handler(), mock_handler) 634 handle = run_loop() 635 mock_handler.assert_called_with(self.loop, { 636 'exception': MOCK_ANY, 637 'message': test_utils.MockPattern( 638 'Exception in callback.*zero_error'), 639 'handle': handle, 640 'source_traceback': handle._source_traceback, 641 }) 642 mock_handler.reset_mock() 643 644 self.loop.set_exception_handler(None) 645 with mock.patch('asyncio.base_events.logger') as log: 646 run_loop() 647 log.error.assert_called_with( 648 test_utils.MockPattern( 649 'Exception in callback.*zero'), 650 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 651 652 assert not mock_handler.called 653 654 def test_set_exc_handler_broken(self): 655 def run_loop(): 656 def zero_error(): 657 1/0 658 self.loop.call_soon(zero_error) 659 self.loop._run_once() 660 661 def handler(loop, context): 662 raise AttributeError('spam') 663 664 self.loop._process_events = mock.Mock() 665 666 self.loop.set_exception_handler(handler) 667 668 with mock.patch('asyncio.base_events.logger') as log: 669 run_loop() 670 log.error.assert_called_with( 671 test_utils.MockPattern( 672 'Unhandled error in exception handler'), 673 exc_info=(AttributeError, MOCK_ANY, MOCK_ANY)) 674 675 def test_default_exc_handler_broken(self): 676 _context = None 677 678 class Loop(base_events.BaseEventLoop): 679 680 _selector = mock.Mock() 681 _process_events = mock.Mock() 682 683 def default_exception_handler(self, context): 684 nonlocal _context 685 _context = context 686 # Simulates custom buggy "default_exception_handler" 687 raise ValueError('spam') 688 689 loop = Loop() 690 self.addCleanup(loop.close) 691 asyncio.set_event_loop(loop) 692 693 def run_loop(): 694 def zero_error(): 695 1/0 696 loop.call_soon(zero_error) 697 loop._run_once() 698 699 with mock.patch('asyncio.base_events.logger') as log: 700 run_loop() 701 log.error.assert_called_with( 702 'Exception in default exception handler', 703 exc_info=True) 704 705 def custom_handler(loop, context): 706 raise ValueError('ham') 707 708 _context = None 709 loop.set_exception_handler(custom_handler) 710 with mock.patch('asyncio.base_events.logger') as log: 711 run_loop() 712 log.error.assert_called_with( 713 test_utils.MockPattern('Exception in default exception.*' 714 'while handling.*in custom'), 715 exc_info=True) 716 717 # Check that original context was passed to default 718 # exception handler. 719 self.assertIn('context', _context) 720 self.assertIs(type(_context['context']['exception']), 721 ZeroDivisionError) 722 723 def test_set_task_factory_invalid(self): 724 with self.assertRaisesRegex( 725 TypeError, 'task factory must be a callable or None'): 726 727 self.loop.set_task_factory(1) 728 729 self.assertIsNone(self.loop.get_task_factory()) 730 731 def test_set_task_factory(self): 732 self.loop._process_events = mock.Mock() 733 734 class MyTask(asyncio.Task): 735 pass 736 737 @asyncio.coroutine 738 def coro(): 739 pass 740 741 factory = lambda loop, coro: MyTask(coro, loop=loop) 742 743 self.assertIsNone(self.loop.get_task_factory()) 744 self.loop.set_task_factory(factory) 745 self.assertIs(self.loop.get_task_factory(), factory) 746 747 task = self.loop.create_task(coro()) 748 self.assertTrue(isinstance(task, MyTask)) 749 self.loop.run_until_complete(task) 750 751 self.loop.set_task_factory(None) 752 self.assertIsNone(self.loop.get_task_factory()) 753 754 task = self.loop.create_task(coro()) 755 self.assertTrue(isinstance(task, asyncio.Task)) 756 self.assertFalse(isinstance(task, MyTask)) 757 self.loop.run_until_complete(task) 758 759 def test_env_var_debug(self): 760 code = '\n'.join(( 761 'import asyncio', 762 'loop = asyncio.get_event_loop()', 763 'print(loop.get_debug())')) 764 765 # Test with -E to not fail if the unit test was run with 766 # PYTHONASYNCIODEBUG set to a non-empty string 767 sts, stdout, stderr = assert_python_ok('-E', '-c', code) 768 self.assertEqual(stdout.rstrip(), b'False') 769 770 sts, stdout, stderr = assert_python_ok('-c', code, 771 PYTHONASYNCIODEBUG='', 772 PYTHONDEVMODE='') 773 self.assertEqual(stdout.rstrip(), b'False') 774 775 sts, stdout, stderr = assert_python_ok('-c', code, 776 PYTHONASYNCIODEBUG='1', 777 PYTHONDEVMODE='') 778 self.assertEqual(stdout.rstrip(), b'True') 779 780 sts, stdout, stderr = assert_python_ok('-E', '-c', code, 781 PYTHONASYNCIODEBUG='1') 782 self.assertEqual(stdout.rstrip(), b'False') 783 784 # -X dev 785 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 786 '-c', code) 787 self.assertEqual(stdout.rstrip(), b'True') 788 789 def test_create_task(self): 790 class MyTask(asyncio.Task): 791 pass 792 793 @asyncio.coroutine 794 def test(): 795 pass 796 797 class EventLoop(base_events.BaseEventLoop): 798 def create_task(self, coro): 799 return MyTask(coro, loop=loop) 800 801 loop = EventLoop() 802 self.set_event_loop(loop) 803 804 coro = test() 805 task = asyncio.ensure_future(coro, loop=loop) 806 self.assertIsInstance(task, MyTask) 807 808 # make warnings quiet 809 task._log_destroy_pending = False 810 coro.close() 811 812 def test_run_forever_keyboard_interrupt(self): 813 # Python issue #22601: ensure that the temporary task created by 814 # run_forever() consumes the KeyboardInterrupt and so don't log 815 # a warning 816 @asyncio.coroutine 817 def raise_keyboard_interrupt(): 818 raise KeyboardInterrupt 819 820 self.loop._process_events = mock.Mock() 821 self.loop.call_exception_handler = mock.Mock() 822 823 try: 824 self.loop.run_until_complete(raise_keyboard_interrupt()) 825 except KeyboardInterrupt: 826 pass 827 self.loop.close() 828 support.gc_collect() 829 830 self.assertFalse(self.loop.call_exception_handler.called) 831 832 def test_run_until_complete_baseexception(self): 833 # Python issue #22429: run_until_complete() must not schedule a pending 834 # call to stop() if the future raised a BaseException 835 @asyncio.coroutine 836 def raise_keyboard_interrupt(): 837 raise KeyboardInterrupt 838 839 self.loop._process_events = mock.Mock() 840 841 try: 842 self.loop.run_until_complete(raise_keyboard_interrupt()) 843 except KeyboardInterrupt: 844 pass 845 846 def func(): 847 self.loop.stop() 848 func.called = True 849 func.called = False 850 try: 851 self.loop.call_soon(func) 852 self.loop.run_forever() 853 except KeyboardInterrupt: 854 pass 855 self.assertTrue(func.called) 856 857 def test_single_selecter_event_callback_after_stopping(self): 858 # Python issue #25593: A stopped event loop may cause event callbacks 859 # to run more than once. 860 event_sentinel = object() 861 callcount = 0 862 doer = None 863 864 def proc_events(event_list): 865 nonlocal doer 866 if event_sentinel in event_list: 867 doer = self.loop.call_soon(do_event) 868 869 def do_event(): 870 nonlocal callcount 871 callcount += 1 872 self.loop.call_soon(clear_selector) 873 874 def clear_selector(): 875 doer.cancel() 876 self.loop._selector.select.return_value = () 877 878 self.loop._process_events = proc_events 879 self.loop._selector.select.return_value = (event_sentinel,) 880 881 for i in range(1, 3): 882 with self.subTest('Loop %d/2' % i): 883 self.loop.call_soon(self.loop.stop) 884 self.loop.run_forever() 885 self.assertEqual(callcount, 1) 886 887 def test_run_once(self): 888 # Simple test for test_utils.run_once(). It may seem strange 889 # to have a test for this (the function isn't even used!) but 890 # it's a de-factor standard API for library tests. This tests 891 # the idiom: loop.call_soon(loop.stop); loop.run_forever(). 892 count = 0 893 894 def callback(): 895 nonlocal count 896 count += 1 897 898 self.loop._process_events = mock.Mock() 899 self.loop.call_soon(callback) 900 test_utils.run_once(self.loop) 901 self.assertEqual(count, 1) 902 903 def test_run_forever_pre_stopped(self): 904 # Test that the old idiom for pre-stopping the loop works. 905 self.loop._process_events = mock.Mock() 906 self.loop.stop() 907 self.loop.run_forever() 908 self.loop._selector.select.assert_called_once_with(0) 909 910 async def leave_unfinalized_asyncgen(self): 911 # Create an async generator, iterate it partially, and leave it 912 # to be garbage collected. 913 # Used in async generator finalization tests. 914 # Depends on implementation details of garbage collector. Changes 915 # in gc may break this function. 916 status = {'started': False, 917 'stopped': False, 918 'finalized': False} 919 920 async def agen(): 921 status['started'] = True 922 try: 923 for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']: 924 yield item 925 finally: 926 status['finalized'] = True 927 928 ag = agen() 929 ai = ag.__aiter__() 930 931 async def iter_one(): 932 try: 933 item = await ai.__anext__() 934 except StopAsyncIteration: 935 return 936 if item == 'THREE': 937 status['stopped'] = True 938 return 939 asyncio.create_task(iter_one()) 940 941 asyncio.create_task(iter_one()) 942 return status 943 944 def test_asyncgen_finalization_by_gc(self): 945 # Async generators should be finalized when garbage collected. 946 self.loop._process_events = mock.Mock() 947 self.loop._write_to_self = mock.Mock() 948 with support.disable_gc(): 949 status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) 950 while not status['stopped']: 951 test_utils.run_briefly(self.loop) 952 self.assertTrue(status['started']) 953 self.assertTrue(status['stopped']) 954 self.assertFalse(status['finalized']) 955 support.gc_collect() 956 test_utils.run_briefly(self.loop) 957 self.assertTrue(status['finalized']) 958 959 def test_asyncgen_finalization_by_gc_in_other_thread(self): 960 # Python issue 34769: If garbage collector runs in another 961 # thread, async generators will not finalize in debug 962 # mode. 963 self.loop._process_events = mock.Mock() 964 self.loop._write_to_self = mock.Mock() 965 self.loop.set_debug(True) 966 with support.disable_gc(): 967 status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) 968 while not status['stopped']: 969 test_utils.run_briefly(self.loop) 970 self.assertTrue(status['started']) 971 self.assertTrue(status['stopped']) 972 self.assertFalse(status['finalized']) 973 self.loop.run_until_complete( 974 self.loop.run_in_executor(None, support.gc_collect)) 975 test_utils.run_briefly(self.loop) 976 self.assertTrue(status['finalized']) 977 978 979class MyProto(asyncio.Protocol): 980 done = None 981 982 def __init__(self, create_future=False): 983 self.state = 'INITIAL' 984 self.nbytes = 0 985 if create_future: 986 self.done = asyncio.Future() 987 988 def connection_made(self, transport): 989 self.transport = transport 990 assert self.state == 'INITIAL', self.state 991 self.state = 'CONNECTED' 992 transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n') 993 994 def data_received(self, data): 995 assert self.state == 'CONNECTED', self.state 996 self.nbytes += len(data) 997 998 def eof_received(self): 999 assert self.state == 'CONNECTED', self.state 1000 self.state = 'EOF' 1001 1002 def connection_lost(self, exc): 1003 assert self.state in ('CONNECTED', 'EOF'), self.state 1004 self.state = 'CLOSED' 1005 if self.done: 1006 self.done.set_result(None) 1007 1008 1009class MyDatagramProto(asyncio.DatagramProtocol): 1010 done = None 1011 1012 def __init__(self, create_future=False, loop=None): 1013 self.state = 'INITIAL' 1014 self.nbytes = 0 1015 if create_future: 1016 self.done = asyncio.Future(loop=loop) 1017 1018 def connection_made(self, transport): 1019 self.transport = transport 1020 assert self.state == 'INITIAL', self.state 1021 self.state = 'INITIALIZED' 1022 1023 def datagram_received(self, data, addr): 1024 assert self.state == 'INITIALIZED', self.state 1025 self.nbytes += len(data) 1026 1027 def error_received(self, exc): 1028 assert self.state == 'INITIALIZED', self.state 1029 1030 def connection_lost(self, exc): 1031 assert self.state == 'INITIALIZED', self.state 1032 self.state = 'CLOSED' 1033 if self.done: 1034 self.done.set_result(None) 1035 1036 1037class BaseEventLoopWithSelectorTests(test_utils.TestCase): 1038 1039 def setUp(self): 1040 super().setUp() 1041 self.loop = asyncio.new_event_loop() 1042 self.set_event_loop(self.loop) 1043 1044 @mock.patch('socket.getnameinfo') 1045 def test_getnameinfo(self, m_gai): 1046 m_gai.side_effect = lambda *args: 42 1047 r = self.loop.run_until_complete(self.loop.getnameinfo(('abc', 123))) 1048 self.assertEqual(r, 42) 1049 1050 @patch_socket 1051 def test_create_connection_multiple_errors(self, m_socket): 1052 1053 class MyProto(asyncio.Protocol): 1054 pass 1055 1056 @asyncio.coroutine 1057 def getaddrinfo(*args, **kw): 1058 yield from [] 1059 return [(2, 1, 6, '', ('107.6.106.82', 80)), 1060 (2, 1, 6, '', ('107.6.106.82', 80))] 1061 1062 def getaddrinfo_task(*args, **kwds): 1063 return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop) 1064 1065 idx = -1 1066 errors = ['err1', 'err2'] 1067 1068 def _socket(*args, **kw): 1069 nonlocal idx, errors 1070 idx += 1 1071 raise OSError(errors[idx]) 1072 1073 m_socket.socket = _socket 1074 1075 self.loop.getaddrinfo = getaddrinfo_task 1076 1077 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1078 with self.assertRaises(OSError) as cm: 1079 self.loop.run_until_complete(coro) 1080 1081 self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2') 1082 1083 @patch_socket 1084 def test_create_connection_timeout(self, m_socket): 1085 # Ensure that the socket is closed on timeout 1086 sock = mock.Mock() 1087 m_socket.socket.return_value = sock 1088 1089 def getaddrinfo(*args, **kw): 1090 fut = asyncio.Future(loop=self.loop) 1091 addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '', 1092 ('127.0.0.1', 80)) 1093 fut.set_result([addr]) 1094 return fut 1095 self.loop.getaddrinfo = getaddrinfo 1096 1097 with mock.patch.object(self.loop, 'sock_connect', 1098 side_effect=asyncio.TimeoutError): 1099 coro = self.loop.create_connection(MyProto, '127.0.0.1', 80) 1100 with self.assertRaises(asyncio.TimeoutError): 1101 self.loop.run_until_complete(coro) 1102 self.assertTrue(sock.close.called) 1103 1104 def test_create_connection_host_port_sock(self): 1105 coro = self.loop.create_connection( 1106 MyProto, 'example.com', 80, sock=object()) 1107 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1108 1109 def test_create_connection_wrong_sock(self): 1110 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1111 with sock: 1112 coro = self.loop.create_connection(MyProto, sock=sock) 1113 with self.assertRaisesRegex(ValueError, 1114 'A Stream Socket was expected'): 1115 self.loop.run_until_complete(coro) 1116 1117 def test_create_server_wrong_sock(self): 1118 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1119 with sock: 1120 coro = self.loop.create_server(MyProto, sock=sock) 1121 with self.assertRaisesRegex(ValueError, 1122 'A Stream Socket was expected'): 1123 self.loop.run_until_complete(coro) 1124 1125 def test_create_server_ssl_timeout_for_plain_socket(self): 1126 coro = self.loop.create_server( 1127 MyProto, 'example.com', 80, ssl_handshake_timeout=1) 1128 with self.assertRaisesRegex( 1129 ValueError, 1130 'ssl_handshake_timeout is only meaningful with ssl'): 1131 self.loop.run_until_complete(coro) 1132 1133 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 1134 'no socket.SOCK_NONBLOCK (linux only)') 1135 def test_create_server_stream_bittype(self): 1136 sock = socket.socket( 1137 socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK) 1138 with sock: 1139 coro = self.loop.create_server(lambda: None, sock=sock) 1140 srv = self.loop.run_until_complete(coro) 1141 srv.close() 1142 self.loop.run_until_complete(srv.wait_closed()) 1143 1144 @unittest.skipUnless(hasattr(socket, 'AF_INET6'), 'no IPv6 support') 1145 def test_create_server_ipv6(self): 1146 async def main(): 1147 srv = await asyncio.start_server( 1148 lambda: None, '::1', 0, loop=self.loop) 1149 try: 1150 self.assertGreater(len(srv.sockets), 0) 1151 finally: 1152 srv.close() 1153 await srv.wait_closed() 1154 1155 try: 1156 self.loop.run_until_complete(main()) 1157 except OSError as ex: 1158 if (hasattr(errno, 'EADDRNOTAVAIL') and 1159 ex.errno == errno.EADDRNOTAVAIL): 1160 self.skipTest('failed to bind to ::1') 1161 else: 1162 raise 1163 1164 def test_create_datagram_endpoint_wrong_sock(self): 1165 sock = socket.socket(socket.AF_INET) 1166 with sock: 1167 coro = self.loop.create_datagram_endpoint(MyProto, sock=sock) 1168 with self.assertRaisesRegex(ValueError, 1169 'A UDP Socket was expected'): 1170 self.loop.run_until_complete(coro) 1171 1172 def test_create_connection_no_host_port_sock(self): 1173 coro = self.loop.create_connection(MyProto) 1174 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1175 1176 def test_create_connection_no_getaddrinfo(self): 1177 @asyncio.coroutine 1178 def getaddrinfo(*args, **kw): 1179 yield from [] 1180 1181 def getaddrinfo_task(*args, **kwds): 1182 return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop) 1183 1184 self.loop.getaddrinfo = getaddrinfo_task 1185 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1186 self.assertRaises( 1187 OSError, self.loop.run_until_complete, coro) 1188 1189 def test_create_connection_connect_err(self): 1190 async def getaddrinfo(*args, **kw): 1191 return [(2, 1, 6, '', ('107.6.106.82', 80))] 1192 1193 def getaddrinfo_task(*args, **kwds): 1194 return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop) 1195 1196 self.loop.getaddrinfo = getaddrinfo_task 1197 self.loop.sock_connect = mock.Mock() 1198 self.loop.sock_connect.side_effect = OSError 1199 1200 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1201 self.assertRaises( 1202 OSError, self.loop.run_until_complete, coro) 1203 1204 def test_create_connection_multiple(self): 1205 @asyncio.coroutine 1206 def getaddrinfo(*args, **kw): 1207 return [(2, 1, 6, '', ('0.0.0.1', 80)), 1208 (2, 1, 6, '', ('0.0.0.2', 80))] 1209 1210 def getaddrinfo_task(*args, **kwds): 1211 return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop) 1212 1213 self.loop.getaddrinfo = getaddrinfo_task 1214 self.loop.sock_connect = mock.Mock() 1215 self.loop.sock_connect.side_effect = OSError 1216 1217 coro = self.loop.create_connection( 1218 MyProto, 'example.com', 80, family=socket.AF_INET) 1219 with self.assertRaises(OSError): 1220 self.loop.run_until_complete(coro) 1221 1222 @patch_socket 1223 def test_create_connection_multiple_errors_local_addr(self, m_socket): 1224 1225 def bind(addr): 1226 if addr[0] == '0.0.0.1': 1227 err = OSError('Err') 1228 err.strerror = 'Err' 1229 raise err 1230 1231 m_socket.socket.return_value.bind = bind 1232 1233 @asyncio.coroutine 1234 def getaddrinfo(*args, **kw): 1235 return [(2, 1, 6, '', ('0.0.0.1', 80)), 1236 (2, 1, 6, '', ('0.0.0.2', 80))] 1237 1238 def getaddrinfo_task(*args, **kwds): 1239 return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop) 1240 1241 self.loop.getaddrinfo = getaddrinfo_task 1242 self.loop.sock_connect = mock.Mock() 1243 self.loop.sock_connect.side_effect = OSError('Err2') 1244 1245 coro = self.loop.create_connection( 1246 MyProto, 'example.com', 80, family=socket.AF_INET, 1247 local_addr=(None, 8080)) 1248 with self.assertRaises(OSError) as cm: 1249 self.loop.run_until_complete(coro) 1250 1251 self.assertTrue(str(cm.exception).startswith('Multiple exceptions: ')) 1252 self.assertTrue(m_socket.socket.return_value.close.called) 1253 1254 def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton): 1255 # Test the fallback code, even if this system has inet_pton. 1256 if not allow_inet_pton: 1257 del m_socket.inet_pton 1258 1259 m_socket.getaddrinfo = socket.getaddrinfo 1260 sock = m_socket.socket.return_value 1261 1262 self.loop._add_reader = mock.Mock() 1263 self.loop._add_reader._is_coroutine = False 1264 self.loop._add_writer = mock.Mock() 1265 self.loop._add_writer._is_coroutine = False 1266 1267 coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80) 1268 t, p = self.loop.run_until_complete(coro) 1269 try: 1270 sock.connect.assert_called_with(('1.2.3.4', 80)) 1271 _, kwargs = m_socket.socket.call_args 1272 self.assertEqual(kwargs['family'], m_socket.AF_INET) 1273 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1274 finally: 1275 t.close() 1276 test_utils.run_briefly(self.loop) # allow transport to close 1277 1278 sock.family = socket.AF_INET6 1279 coro = self.loop.create_connection(asyncio.Protocol, '::1', 80) 1280 t, p = self.loop.run_until_complete(coro) 1281 try: 1282 # Without inet_pton we use getaddrinfo, which transforms ('::1', 80) 1283 # to ('::1', 80, 0, 0). The last 0s are flow info, scope id. 1284 [address] = sock.connect.call_args[0] 1285 host, port = address[:2] 1286 self.assertRegex(host, r'::(0\.)*1') 1287 self.assertEqual(port, 80) 1288 _, kwargs = m_socket.socket.call_args 1289 self.assertEqual(kwargs['family'], m_socket.AF_INET6) 1290 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1291 finally: 1292 t.close() 1293 test_utils.run_briefly(self.loop) # allow transport to close 1294 1295 @patch_socket 1296 def test_create_connection_ip_addr(self, m_socket): 1297 self._test_create_connection_ip_addr(m_socket, True) 1298 1299 @patch_socket 1300 def test_create_connection_no_inet_pton(self, m_socket): 1301 self._test_create_connection_ip_addr(m_socket, False) 1302 1303 @patch_socket 1304 def test_create_connection_service_name(self, m_socket): 1305 m_socket.getaddrinfo = socket.getaddrinfo 1306 sock = m_socket.socket.return_value 1307 1308 self.loop._add_reader = mock.Mock() 1309 self.loop._add_reader._is_coroutine = False 1310 self.loop._add_writer = mock.Mock() 1311 self.loop._add_writer._is_coroutine = False 1312 1313 for service, port in ('http', 80), (b'http', 80): 1314 coro = self.loop.create_connection(asyncio.Protocol, 1315 '127.0.0.1', service) 1316 1317 t, p = self.loop.run_until_complete(coro) 1318 try: 1319 sock.connect.assert_called_with(('127.0.0.1', port)) 1320 _, kwargs = m_socket.socket.call_args 1321 self.assertEqual(kwargs['family'], m_socket.AF_INET) 1322 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1323 finally: 1324 t.close() 1325 test_utils.run_briefly(self.loop) # allow transport to close 1326 1327 for service in 'nonsense', b'nonsense': 1328 coro = self.loop.create_connection(asyncio.Protocol, 1329 '127.0.0.1', service) 1330 1331 with self.assertRaises(OSError): 1332 self.loop.run_until_complete(coro) 1333 1334 def test_create_connection_no_local_addr(self): 1335 @asyncio.coroutine 1336 def getaddrinfo(host, *args, **kw): 1337 if host == 'example.com': 1338 return [(2, 1, 6, '', ('107.6.106.82', 80)), 1339 (2, 1, 6, '', ('107.6.106.82', 80))] 1340 else: 1341 return [] 1342 1343 def getaddrinfo_task(*args, **kwds): 1344 return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop) 1345 self.loop.getaddrinfo = getaddrinfo_task 1346 1347 coro = self.loop.create_connection( 1348 MyProto, 'example.com', 80, family=socket.AF_INET, 1349 local_addr=(None, 8080)) 1350 self.assertRaises( 1351 OSError, self.loop.run_until_complete, coro) 1352 1353 @patch_socket 1354 def test_create_connection_bluetooth(self, m_socket): 1355 # See http://bugs.python.org/issue27136, fallback to getaddrinfo when 1356 # we can't recognize an address is resolved, e.g. a Bluetooth address. 1357 addr = ('00:01:02:03:04:05', 1) 1358 1359 def getaddrinfo(host, port, *args, **kw): 1360 assert (host, port) == addr 1361 return [(999, 1, 999, '', (addr, 1))] 1362 1363 m_socket.getaddrinfo = getaddrinfo 1364 sock = m_socket.socket() 1365 coro = self.loop.sock_connect(sock, addr) 1366 self.loop.run_until_complete(coro) 1367 1368 def test_create_connection_ssl_server_hostname_default(self): 1369 self.loop.getaddrinfo = mock.Mock() 1370 1371 def mock_getaddrinfo(*args, **kwds): 1372 f = asyncio.Future(loop=self.loop) 1373 f.set_result([(socket.AF_INET, socket.SOCK_STREAM, 1374 socket.SOL_TCP, '', ('1.2.3.4', 80))]) 1375 return f 1376 1377 self.loop.getaddrinfo.side_effect = mock_getaddrinfo 1378 self.loop.sock_connect = mock.Mock() 1379 self.loop.sock_connect.return_value = self.loop.create_future() 1380 self.loop.sock_connect.return_value.set_result(None) 1381 self.loop._make_ssl_transport = mock.Mock() 1382 1383 class _SelectorTransportMock: 1384 _sock = None 1385 1386 def get_extra_info(self, key): 1387 return mock.Mock() 1388 1389 def close(self): 1390 self._sock.close() 1391 1392 def mock_make_ssl_transport(sock, protocol, sslcontext, waiter, 1393 **kwds): 1394 waiter.set_result(None) 1395 transport = _SelectorTransportMock() 1396 transport._sock = sock 1397 return transport 1398 1399 self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport 1400 ANY = mock.ANY 1401 handshake_timeout = object() 1402 # First try the default server_hostname. 1403 self.loop._make_ssl_transport.reset_mock() 1404 coro = self.loop.create_connection( 1405 MyProto, 'python.org', 80, ssl=True, 1406 ssl_handshake_timeout=handshake_timeout) 1407 transport, _ = self.loop.run_until_complete(coro) 1408 transport.close() 1409 self.loop._make_ssl_transport.assert_called_with( 1410 ANY, ANY, ANY, ANY, 1411 server_side=False, 1412 server_hostname='python.org', 1413 ssl_handshake_timeout=handshake_timeout) 1414 # Next try an explicit server_hostname. 1415 self.loop._make_ssl_transport.reset_mock() 1416 coro = self.loop.create_connection( 1417 MyProto, 'python.org', 80, ssl=True, 1418 server_hostname='perl.com', 1419 ssl_handshake_timeout=handshake_timeout) 1420 transport, _ = self.loop.run_until_complete(coro) 1421 transport.close() 1422 self.loop._make_ssl_transport.assert_called_with( 1423 ANY, ANY, ANY, ANY, 1424 server_side=False, 1425 server_hostname='perl.com', 1426 ssl_handshake_timeout=handshake_timeout) 1427 # Finally try an explicit empty server_hostname. 1428 self.loop._make_ssl_transport.reset_mock() 1429 coro = self.loop.create_connection( 1430 MyProto, 'python.org', 80, ssl=True, 1431 server_hostname='', 1432 ssl_handshake_timeout=handshake_timeout) 1433 transport, _ = self.loop.run_until_complete(coro) 1434 transport.close() 1435 self.loop._make_ssl_transport.assert_called_with( 1436 ANY, ANY, ANY, ANY, 1437 server_side=False, 1438 server_hostname='', 1439 ssl_handshake_timeout=handshake_timeout) 1440 1441 def test_create_connection_no_ssl_server_hostname_errors(self): 1442 # When not using ssl, server_hostname must be None. 1443 coro = self.loop.create_connection(MyProto, 'python.org', 80, 1444 server_hostname='') 1445 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1446 coro = self.loop.create_connection(MyProto, 'python.org', 80, 1447 server_hostname='python.org') 1448 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1449 1450 def test_create_connection_ssl_server_hostname_errors(self): 1451 # When using ssl, server_hostname may be None if host is non-empty. 1452 coro = self.loop.create_connection(MyProto, '', 80, ssl=True) 1453 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1454 coro = self.loop.create_connection(MyProto, None, 80, ssl=True) 1455 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1456 sock = socket.socket() 1457 coro = self.loop.create_connection(MyProto, None, None, 1458 ssl=True, sock=sock) 1459 self.addCleanup(sock.close) 1460 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1461 1462 def test_create_connection_ssl_timeout_for_plain_socket(self): 1463 coro = self.loop.create_connection( 1464 MyProto, 'example.com', 80, ssl_handshake_timeout=1) 1465 with self.assertRaisesRegex( 1466 ValueError, 1467 'ssl_handshake_timeout is only meaningful with ssl'): 1468 self.loop.run_until_complete(coro) 1469 1470 def test_create_server_empty_host(self): 1471 # if host is empty string use None instead 1472 host = object() 1473 1474 @asyncio.coroutine 1475 def getaddrinfo(*args, **kw): 1476 nonlocal host 1477 host = args[0] 1478 yield from [] 1479 1480 def getaddrinfo_task(*args, **kwds): 1481 return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop) 1482 1483 self.loop.getaddrinfo = getaddrinfo_task 1484 fut = self.loop.create_server(MyProto, '', 0) 1485 self.assertRaises(OSError, self.loop.run_until_complete, fut) 1486 self.assertIsNone(host) 1487 1488 def test_create_server_host_port_sock(self): 1489 fut = self.loop.create_server( 1490 MyProto, '0.0.0.0', 0, sock=object()) 1491 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1492 1493 def test_create_server_no_host_port_sock(self): 1494 fut = self.loop.create_server(MyProto) 1495 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1496 1497 def test_create_server_no_getaddrinfo(self): 1498 getaddrinfo = self.loop.getaddrinfo = mock.Mock() 1499 getaddrinfo.return_value = self.loop.create_future() 1500 getaddrinfo.return_value.set_result(None) 1501 1502 f = self.loop.create_server(MyProto, 'python.org', 0) 1503 self.assertRaises(OSError, self.loop.run_until_complete, f) 1504 1505 @patch_socket 1506 def test_create_server_nosoreuseport(self, m_socket): 1507 m_socket.getaddrinfo = socket.getaddrinfo 1508 del m_socket.SO_REUSEPORT 1509 m_socket.socket.return_value = mock.Mock() 1510 1511 f = self.loop.create_server( 1512 MyProto, '0.0.0.0', 0, reuse_port=True) 1513 1514 self.assertRaises(ValueError, self.loop.run_until_complete, f) 1515 1516 @patch_socket 1517 def test_create_server_soreuseport_only_defined(self, m_socket): 1518 m_socket.getaddrinfo = socket.getaddrinfo 1519 m_socket.socket.return_value = mock.Mock() 1520 m_socket.SO_REUSEPORT = -1 1521 1522 f = self.loop.create_server( 1523 MyProto, '0.0.0.0', 0, reuse_port=True) 1524 1525 self.assertRaises(ValueError, self.loop.run_until_complete, f) 1526 1527 @patch_socket 1528 def test_create_server_cant_bind(self, m_socket): 1529 1530 class Err(OSError): 1531 strerror = 'error' 1532 1533 m_socket.getaddrinfo.return_value = [ 1534 (2, 1, 6, '', ('127.0.0.1', 10100))] 1535 m_socket.getaddrinfo._is_coroutine = False 1536 m_sock = m_socket.socket.return_value = mock.Mock() 1537 m_sock.bind.side_effect = Err 1538 1539 fut = self.loop.create_server(MyProto, '0.0.0.0', 0) 1540 self.assertRaises(OSError, self.loop.run_until_complete, fut) 1541 self.assertTrue(m_sock.close.called) 1542 1543 @patch_socket 1544 def test_create_datagram_endpoint_no_addrinfo(self, m_socket): 1545 m_socket.getaddrinfo.return_value = [] 1546 m_socket.getaddrinfo._is_coroutine = False 1547 1548 coro = self.loop.create_datagram_endpoint( 1549 MyDatagramProto, local_addr=('localhost', 0)) 1550 self.assertRaises( 1551 OSError, self.loop.run_until_complete, coro) 1552 1553 def test_create_datagram_endpoint_addr_error(self): 1554 coro = self.loop.create_datagram_endpoint( 1555 MyDatagramProto, local_addr='localhost') 1556 self.assertRaises( 1557 AssertionError, self.loop.run_until_complete, coro) 1558 coro = self.loop.create_datagram_endpoint( 1559 MyDatagramProto, local_addr=('localhost', 1, 2, 3)) 1560 self.assertRaises( 1561 AssertionError, self.loop.run_until_complete, coro) 1562 1563 def test_create_datagram_endpoint_connect_err(self): 1564 self.loop.sock_connect = mock.Mock() 1565 self.loop.sock_connect.side_effect = OSError 1566 1567 coro = self.loop.create_datagram_endpoint( 1568 asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0)) 1569 self.assertRaises( 1570 OSError, self.loop.run_until_complete, coro) 1571 1572 @patch_socket 1573 def test_create_datagram_endpoint_socket_err(self, m_socket): 1574 m_socket.getaddrinfo = socket.getaddrinfo 1575 m_socket.socket.side_effect = OSError 1576 1577 coro = self.loop.create_datagram_endpoint( 1578 asyncio.DatagramProtocol, family=socket.AF_INET) 1579 self.assertRaises( 1580 OSError, self.loop.run_until_complete, coro) 1581 1582 coro = self.loop.create_datagram_endpoint( 1583 asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0)) 1584 self.assertRaises( 1585 OSError, self.loop.run_until_complete, coro) 1586 1587 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled') 1588 def test_create_datagram_endpoint_no_matching_family(self): 1589 coro = self.loop.create_datagram_endpoint( 1590 asyncio.DatagramProtocol, 1591 remote_addr=('127.0.0.1', 0), local_addr=('::1', 0)) 1592 self.assertRaises( 1593 ValueError, self.loop.run_until_complete, coro) 1594 1595 @patch_socket 1596 def test_create_datagram_endpoint_setblk_err(self, m_socket): 1597 m_socket.socket.return_value.setblocking.side_effect = OSError 1598 1599 coro = self.loop.create_datagram_endpoint( 1600 asyncio.DatagramProtocol, family=socket.AF_INET) 1601 self.assertRaises( 1602 OSError, self.loop.run_until_complete, coro) 1603 self.assertTrue( 1604 m_socket.socket.return_value.close.called) 1605 1606 def test_create_datagram_endpoint_noaddr_nofamily(self): 1607 coro = self.loop.create_datagram_endpoint( 1608 asyncio.DatagramProtocol) 1609 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1610 1611 @patch_socket 1612 def test_create_datagram_endpoint_cant_bind(self, m_socket): 1613 class Err(OSError): 1614 pass 1615 1616 m_socket.getaddrinfo = socket.getaddrinfo 1617 m_sock = m_socket.socket.return_value = mock.Mock() 1618 m_sock.bind.side_effect = Err 1619 1620 fut = self.loop.create_datagram_endpoint( 1621 MyDatagramProto, 1622 local_addr=('127.0.0.1', 0), family=socket.AF_INET) 1623 self.assertRaises(Err, self.loop.run_until_complete, fut) 1624 self.assertTrue(m_sock.close.called) 1625 1626 def test_create_datagram_endpoint_sock(self): 1627 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1628 sock.bind(('127.0.0.1', 0)) 1629 fut = self.loop.create_datagram_endpoint( 1630 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1631 sock=sock) 1632 transport, protocol = self.loop.run_until_complete(fut) 1633 transport.close() 1634 self.loop.run_until_complete(protocol.done) 1635 self.assertEqual('CLOSED', protocol.state) 1636 1637 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 1638 def test_create_datagram_endpoint_sock_unix(self): 1639 fut = self.loop.create_datagram_endpoint( 1640 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1641 family=socket.AF_UNIX) 1642 transport, protocol = self.loop.run_until_complete(fut) 1643 assert transport._sock.family == socket.AF_UNIX 1644 transport.close() 1645 self.loop.run_until_complete(protocol.done) 1646 self.assertEqual('CLOSED', protocol.state) 1647 1648 def test_create_datagram_endpoint_sock_sockopts(self): 1649 class FakeSock: 1650 type = socket.SOCK_DGRAM 1651 1652 fut = self.loop.create_datagram_endpoint( 1653 MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock()) 1654 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1655 1656 fut = self.loop.create_datagram_endpoint( 1657 MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock()) 1658 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1659 1660 fut = self.loop.create_datagram_endpoint( 1661 MyDatagramProto, family=1, sock=FakeSock()) 1662 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1663 1664 fut = self.loop.create_datagram_endpoint( 1665 MyDatagramProto, proto=1, sock=FakeSock()) 1666 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1667 1668 fut = self.loop.create_datagram_endpoint( 1669 MyDatagramProto, flags=1, sock=FakeSock()) 1670 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1671 1672 fut = self.loop.create_datagram_endpoint( 1673 MyDatagramProto, reuse_address=True, sock=FakeSock()) 1674 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1675 1676 fut = self.loop.create_datagram_endpoint( 1677 MyDatagramProto, reuse_port=True, sock=FakeSock()) 1678 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1679 1680 fut = self.loop.create_datagram_endpoint( 1681 MyDatagramProto, allow_broadcast=True, sock=FakeSock()) 1682 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1683 1684 def test_create_datagram_endpoint_sockopts(self): 1685 # Socket options should not be applied unless asked for. 1686 # SO_REUSEADDR defaults to on for UNIX. 1687 # SO_REUSEPORT is not available on all platforms. 1688 1689 coro = self.loop.create_datagram_endpoint( 1690 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1691 local_addr=('127.0.0.1', 0)) 1692 transport, protocol = self.loop.run_until_complete(coro) 1693 sock = transport.get_extra_info('socket') 1694 1695 reuse_address_default_on = ( 1696 os.name == 'posix' and sys.platform != 'cygwin') 1697 reuseport_supported = hasattr(socket, 'SO_REUSEPORT') 1698 1699 if reuse_address_default_on: 1700 self.assertTrue( 1701 sock.getsockopt( 1702 socket.SOL_SOCKET, socket.SO_REUSEADDR)) 1703 else: 1704 self.assertFalse( 1705 sock.getsockopt( 1706 socket.SOL_SOCKET, socket.SO_REUSEADDR)) 1707 if reuseport_supported: 1708 self.assertFalse( 1709 sock.getsockopt( 1710 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 1711 self.assertFalse( 1712 sock.getsockopt( 1713 socket.SOL_SOCKET, socket.SO_BROADCAST)) 1714 1715 transport.close() 1716 self.loop.run_until_complete(protocol.done) 1717 self.assertEqual('CLOSED', protocol.state) 1718 1719 coro = self.loop.create_datagram_endpoint( 1720 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1721 local_addr=('127.0.0.1', 0), 1722 reuse_address=True, 1723 reuse_port=reuseport_supported, 1724 allow_broadcast=True) 1725 transport, protocol = self.loop.run_until_complete(coro) 1726 sock = transport.get_extra_info('socket') 1727 1728 self.assertTrue( 1729 sock.getsockopt( 1730 socket.SOL_SOCKET, socket.SO_REUSEADDR)) 1731 if reuseport_supported: 1732 self.assertTrue( 1733 sock.getsockopt( 1734 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 1735 self.assertTrue( 1736 sock.getsockopt( 1737 socket.SOL_SOCKET, socket.SO_BROADCAST)) 1738 1739 transport.close() 1740 self.loop.run_until_complete(protocol.done) 1741 self.assertEqual('CLOSED', protocol.state) 1742 1743 @patch_socket 1744 def test_create_datagram_endpoint_nosoreuseport(self, m_socket): 1745 del m_socket.SO_REUSEPORT 1746 m_socket.socket.return_value = mock.Mock() 1747 1748 coro = self.loop.create_datagram_endpoint( 1749 lambda: MyDatagramProto(loop=self.loop), 1750 local_addr=('127.0.0.1', 0), 1751 reuse_address=False, 1752 reuse_port=True) 1753 1754 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1755 1756 @patch_socket 1757 def test_create_datagram_endpoint_ip_addr(self, m_socket): 1758 def getaddrinfo(*args, **kw): 1759 self.fail('should not have called getaddrinfo') 1760 1761 m_socket.getaddrinfo = getaddrinfo 1762 m_socket.socket.return_value.bind = bind = mock.Mock() 1763 self.loop._add_reader = mock.Mock() 1764 self.loop._add_reader._is_coroutine = False 1765 1766 reuseport_supported = hasattr(socket, 'SO_REUSEPORT') 1767 coro = self.loop.create_datagram_endpoint( 1768 lambda: MyDatagramProto(loop=self.loop), 1769 local_addr=('1.2.3.4', 0), 1770 reuse_address=False, 1771 reuse_port=reuseport_supported) 1772 1773 t, p = self.loop.run_until_complete(coro) 1774 try: 1775 bind.assert_called_with(('1.2.3.4', 0)) 1776 m_socket.socket.assert_called_with(family=m_socket.AF_INET, 1777 proto=m_socket.IPPROTO_UDP, 1778 type=m_socket.SOCK_DGRAM) 1779 finally: 1780 t.close() 1781 test_utils.run_briefly(self.loop) # allow transport to close 1782 1783 def test_accept_connection_retry(self): 1784 sock = mock.Mock() 1785 sock.accept.side_effect = BlockingIOError() 1786 1787 self.loop._accept_connection(MyProto, sock) 1788 self.assertFalse(sock.close.called) 1789 1790 @mock.patch('asyncio.base_events.logger') 1791 def test_accept_connection_exception(self, m_log): 1792 sock = mock.Mock() 1793 sock.fileno.return_value = 10 1794 sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files') 1795 self.loop._remove_reader = mock.Mock() 1796 self.loop.call_later = mock.Mock() 1797 1798 self.loop._accept_connection(MyProto, sock) 1799 self.assertTrue(m_log.error.called) 1800 self.assertFalse(sock.close.called) 1801 self.loop._remove_reader.assert_called_with(10) 1802 self.loop.call_later.assert_called_with( 1803 constants.ACCEPT_RETRY_DELAY, 1804 # self.loop._start_serving 1805 mock.ANY, 1806 MyProto, sock, None, None, mock.ANY, mock.ANY) 1807 1808 def test_call_coroutine(self): 1809 @asyncio.coroutine 1810 def simple_coroutine(): 1811 pass 1812 1813 self.loop.set_debug(True) 1814 coro_func = simple_coroutine 1815 coro_obj = coro_func() 1816 self.addCleanup(coro_obj.close) 1817 for func in (coro_func, coro_obj): 1818 with self.assertRaises(TypeError): 1819 self.loop.call_soon(func) 1820 with self.assertRaises(TypeError): 1821 self.loop.call_soon_threadsafe(func) 1822 with self.assertRaises(TypeError): 1823 self.loop.call_later(60, func) 1824 with self.assertRaises(TypeError): 1825 self.loop.call_at(self.loop.time() + 60, func) 1826 with self.assertRaises(TypeError): 1827 self.loop.run_until_complete( 1828 self.loop.run_in_executor(None, func)) 1829 1830 @mock.patch('asyncio.base_events.logger') 1831 def test_log_slow_callbacks(self, m_logger): 1832 def stop_loop_cb(loop): 1833 loop.stop() 1834 1835 @asyncio.coroutine 1836 def stop_loop_coro(loop): 1837 yield from () 1838 loop.stop() 1839 1840 asyncio.set_event_loop(self.loop) 1841 self.loop.set_debug(True) 1842 self.loop.slow_callback_duration = 0.0 1843 1844 # slow callback 1845 self.loop.call_soon(stop_loop_cb, self.loop) 1846 self.loop.run_forever() 1847 fmt, *args = m_logger.warning.call_args[0] 1848 self.assertRegex(fmt % tuple(args), 1849 "^Executing <Handle.*stop_loop_cb.*> " 1850 "took .* seconds$") 1851 1852 # slow task 1853 asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop) 1854 self.loop.run_forever() 1855 fmt, *args = m_logger.warning.call_args[0] 1856 self.assertRegex(fmt % tuple(args), 1857 "^Executing <Task.*stop_loop_coro.*> " 1858 "took .* seconds$") 1859 1860 1861class RunningLoopTests(unittest.TestCase): 1862 1863 def test_running_loop_within_a_loop(self): 1864 @asyncio.coroutine 1865 def runner(loop): 1866 loop.run_forever() 1867 1868 loop = asyncio.new_event_loop() 1869 outer_loop = asyncio.new_event_loop() 1870 try: 1871 with self.assertRaisesRegex(RuntimeError, 1872 'while another loop is running'): 1873 outer_loop.run_until_complete(runner(loop)) 1874 finally: 1875 loop.close() 1876 outer_loop.close() 1877 1878 1879class BaseLoopSockSendfileTests(test_utils.TestCase): 1880 1881 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 1882 1883 class MyProto(asyncio.Protocol): 1884 1885 def __init__(self, loop): 1886 self.started = False 1887 self.closed = False 1888 self.data = bytearray() 1889 self.fut = loop.create_future() 1890 self.transport = None 1891 1892 def connection_made(self, transport): 1893 self.started = True 1894 self.transport = transport 1895 1896 def data_received(self, data): 1897 self.data.extend(data) 1898 1899 def connection_lost(self, exc): 1900 self.closed = True 1901 self.fut.set_result(None) 1902 self.transport = None 1903 1904 async def wait_closed(self): 1905 await self.fut 1906 1907 @classmethod 1908 def setUpClass(cls): 1909 cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE 1910 constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16 1911 with open(support.TESTFN, 'wb') as fp: 1912 fp.write(cls.DATA) 1913 super().setUpClass() 1914 1915 @classmethod 1916 def tearDownClass(cls): 1917 constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize 1918 support.unlink(support.TESTFN) 1919 super().tearDownClass() 1920 1921 def setUp(self): 1922 from asyncio.selector_events import BaseSelectorEventLoop 1923 # BaseSelectorEventLoop() has no native implementation 1924 self.loop = BaseSelectorEventLoop() 1925 self.set_event_loop(self.loop) 1926 self.file = open(support.TESTFN, 'rb') 1927 self.addCleanup(self.file.close) 1928 super().setUp() 1929 1930 def make_socket(self, blocking=False): 1931 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1932 sock.setblocking(blocking) 1933 self.addCleanup(sock.close) 1934 return sock 1935 1936 def run_loop(self, coro): 1937 return self.loop.run_until_complete(coro) 1938 1939 def prepare(self): 1940 sock = self.make_socket() 1941 proto = self.MyProto(self.loop) 1942 server = self.run_loop(self.loop.create_server( 1943 lambda: proto, support.HOST, 0, family=socket.AF_INET)) 1944 addr = server.sockets[0].getsockname() 1945 1946 for _ in range(10): 1947 try: 1948 self.run_loop(self.loop.sock_connect(sock, addr)) 1949 except OSError: 1950 self.run_loop(asyncio.sleep(0.5)) 1951 continue 1952 else: 1953 break 1954 else: 1955 # One last try, so we get the exception 1956 self.run_loop(self.loop.sock_connect(sock, addr)) 1957 1958 def cleanup(): 1959 server.close() 1960 self.run_loop(server.wait_closed()) 1961 sock.close() 1962 if proto.transport is not None: 1963 proto.transport.close() 1964 self.run_loop(proto.wait_closed()) 1965 1966 self.addCleanup(cleanup) 1967 1968 return sock, proto 1969 1970 def test__sock_sendfile_native_failure(self): 1971 sock, proto = self.prepare() 1972 1973 with self.assertRaisesRegex(events.SendfileNotAvailableError, 1974 "sendfile is not available"): 1975 self.run_loop(self.loop._sock_sendfile_native(sock, self.file, 1976 0, None)) 1977 1978 self.assertEqual(proto.data, b'') 1979 self.assertEqual(self.file.tell(), 0) 1980 1981 def test_sock_sendfile_no_fallback(self): 1982 sock, proto = self.prepare() 1983 1984 with self.assertRaisesRegex(events.SendfileNotAvailableError, 1985 "sendfile is not available"): 1986 self.run_loop(self.loop.sock_sendfile(sock, self.file, 1987 fallback=False)) 1988 1989 self.assertEqual(self.file.tell(), 0) 1990 self.assertEqual(proto.data, b'') 1991 1992 def test_sock_sendfile_fallback(self): 1993 sock, proto = self.prepare() 1994 1995 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file)) 1996 sock.close() 1997 self.run_loop(proto.wait_closed()) 1998 1999 self.assertEqual(ret, len(self.DATA)) 2000 self.assertEqual(self.file.tell(), len(self.DATA)) 2001 self.assertEqual(proto.data, self.DATA) 2002 2003 def test_sock_sendfile_fallback_offset_and_count(self): 2004 sock, proto = self.prepare() 2005 2006 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file, 2007 1000, 2000)) 2008 sock.close() 2009 self.run_loop(proto.wait_closed()) 2010 2011 self.assertEqual(ret, 2000) 2012 self.assertEqual(self.file.tell(), 3000) 2013 self.assertEqual(proto.data, self.DATA[1000:3000]) 2014 2015 def test_blocking_socket(self): 2016 self.loop.set_debug(True) 2017 sock = self.make_socket(blocking=True) 2018 with self.assertRaisesRegex(ValueError, "must be non-blocking"): 2019 self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2020 2021 def test_nonbinary_file(self): 2022 sock = self.make_socket() 2023 with open(support.TESTFN, 'r') as f: 2024 with self.assertRaisesRegex(ValueError, "binary mode"): 2025 self.run_loop(self.loop.sock_sendfile(sock, f)) 2026 2027 def test_nonstream_socket(self): 2028 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 2029 sock.setblocking(False) 2030 self.addCleanup(sock.close) 2031 with self.assertRaisesRegex(ValueError, "only SOCK_STREAM type"): 2032 self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2033 2034 def test_notint_count(self): 2035 sock = self.make_socket() 2036 with self.assertRaisesRegex(TypeError, 2037 "count must be a positive integer"): 2038 self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, 'count')) 2039 2040 def test_negative_count(self): 2041 sock = self.make_socket() 2042 with self.assertRaisesRegex(ValueError, 2043 "count must be a positive integer"): 2044 self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, -1)) 2045 2046 def test_notint_offset(self): 2047 sock = self.make_socket() 2048 with self.assertRaisesRegex(TypeError, 2049 "offset must be a non-negative integer"): 2050 self.run_loop(self.loop.sock_sendfile(sock, self.file, 'offset')) 2051 2052 def test_negative_offset(self): 2053 sock = self.make_socket() 2054 with self.assertRaisesRegex(ValueError, 2055 "offset must be a non-negative integer"): 2056 self.run_loop(self.loop.sock_sendfile(sock, self.file, -1)) 2057 2058 2059class TestSelectorUtils(test_utils.TestCase): 2060 def check_set_nodelay(self, sock): 2061 opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) 2062 self.assertFalse(opt) 2063 2064 base_events._set_nodelay(sock) 2065 2066 opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) 2067 self.assertTrue(opt) 2068 2069 @unittest.skipUnless(hasattr(socket, 'TCP_NODELAY'), 2070 'need socket.TCP_NODELAY') 2071 def test_set_nodelay(self): 2072 sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, 2073 proto=socket.IPPROTO_TCP) 2074 with sock: 2075 self.check_set_nodelay(sock) 2076 2077 sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, 2078 proto=socket.IPPROTO_TCP) 2079 with sock: 2080 sock.setblocking(False) 2081 self.check_set_nodelay(sock) 2082 2083 2084 2085if __name__ == '__main__': 2086 unittest.main() 2087