1"""Tests for base_events.py""" 2 3import concurrent.futures 4import errno 5import math 6import socket 7import sys 8import threading 9import time 10import unittest 11from unittest import mock 12 13import asyncio 14from asyncio import base_events 15from asyncio import constants 16from test.test_asyncio import utils as test_utils 17from test import support 18from test.support.script_helper import assert_python_ok 19from test.support import os_helper 20from test.support import socket_helper 21 22 23MOCK_ANY = mock.ANY 24PY34 = sys.version_info >= (3, 4) 25 26 27def tearDownModule(): 28 asyncio.set_event_loop_policy(None) 29 30 31def mock_socket_module(): 32 m_socket = mock.MagicMock(spec=socket) 33 for name in ( 34 'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP', 35 'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton' 36 ): 37 if hasattr(socket, name): 38 setattr(m_socket, name, getattr(socket, name)) 39 else: 40 delattr(m_socket, name) 41 42 m_socket.socket = mock.MagicMock() 43 m_socket.socket.return_value = test_utils.mock_nonblocking_socket() 44 m_socket.getaddrinfo._is_coroutine = False 45 46 return m_socket 47 48 49def patch_socket(f): 50 return mock.patch('asyncio.base_events.socket', 51 new_callable=mock_socket_module)(f) 52 53 54class BaseEventTests(test_utils.TestCase): 55 56 def test_ipaddr_info(self): 57 UNSPEC = socket.AF_UNSPEC 58 INET = socket.AF_INET 59 INET6 = socket.AF_INET6 60 STREAM = socket.SOCK_STREAM 61 DGRAM = socket.SOCK_DGRAM 62 TCP = socket.IPPROTO_TCP 63 UDP = socket.IPPROTO_UDP 64 65 self.assertEqual( 66 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 67 base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP)) 68 69 self.assertEqual( 70 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 71 base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP)) 72 73 self.assertEqual( 74 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 75 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP)) 76 77 self.assertEqual( 78 (INET, DGRAM, UDP, '', ('1.2.3.4', 1)), 79 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP)) 80 81 # Socket type STREAM implies TCP protocol. 82 self.assertEqual( 83 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 84 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0)) 85 86 # Socket type DGRAM implies UDP protocol. 87 self.assertEqual( 88 (INET, DGRAM, UDP, '', ('1.2.3.4', 1)), 89 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0)) 90 91 # No socket type. 92 self.assertIsNone( 93 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0)) 94 95 if socket_helper.IPV6_ENABLED: 96 # IPv4 address with family IPv6. 97 self.assertIsNone( 98 base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP)) 99 100 self.assertEqual( 101 (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)), 102 base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP)) 103 104 self.assertEqual( 105 (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)), 106 base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP)) 107 108 # IPv6 address with family IPv4. 109 self.assertIsNone( 110 base_events._ipaddr_info('::3', 1, INET, STREAM, TCP)) 111 112 # IPv6 address with zone index. 113 self.assertIsNone( 114 base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP)) 115 116 def test_port_parameter_types(self): 117 # Test obscure kinds of arguments for "port". 118 INET = socket.AF_INET 119 STREAM = socket.SOCK_STREAM 120 TCP = socket.IPPROTO_TCP 121 122 self.assertEqual( 123 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 124 base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP)) 125 126 self.assertEqual( 127 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 128 base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP)) 129 130 self.assertEqual( 131 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 132 base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP)) 133 134 self.assertEqual( 135 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 136 base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP)) 137 138 self.assertEqual( 139 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 140 base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP)) 141 142 @patch_socket 143 def test_ipaddr_info_no_inet_pton(self, m_socket): 144 del m_socket.inet_pton 145 self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1, 146 socket.AF_INET, 147 socket.SOCK_STREAM, 148 socket.IPPROTO_TCP)) 149 150 151class BaseEventLoopTests(test_utils.TestCase): 152 153 def setUp(self): 154 super().setUp() 155 self.loop = base_events.BaseEventLoop() 156 self.loop._selector = mock.Mock() 157 self.loop._selector.select.return_value = () 158 self.set_event_loop(self.loop) 159 160 def test_not_implemented(self): 161 m = mock.Mock() 162 self.assertRaises( 163 NotImplementedError, 164 self.loop._make_socket_transport, m, m) 165 self.assertRaises( 166 NotImplementedError, 167 self.loop._make_ssl_transport, m, m, m, m) 168 self.assertRaises( 169 NotImplementedError, 170 self.loop._make_datagram_transport, m, m) 171 self.assertRaises( 172 NotImplementedError, self.loop._process_events, []) 173 self.assertRaises( 174 NotImplementedError, self.loop._write_to_self) 175 self.assertRaises( 176 NotImplementedError, 177 self.loop._make_read_pipe_transport, m, m) 178 self.assertRaises( 179 NotImplementedError, 180 self.loop._make_write_pipe_transport, m, m) 181 gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m) 182 with self.assertRaises(NotImplementedError): 183 gen.send(None) 184 185 def test_close(self): 186 self.assertFalse(self.loop.is_closed()) 187 self.loop.close() 188 self.assertTrue(self.loop.is_closed()) 189 190 # it should be possible to call close() more than once 191 self.loop.close() 192 self.loop.close() 193 194 # operation blocked when the loop is closed 195 f = self.loop.create_future() 196 self.assertRaises(RuntimeError, self.loop.run_forever) 197 self.assertRaises(RuntimeError, self.loop.run_until_complete, f) 198 199 def test__add_callback_handle(self): 200 h = asyncio.Handle(lambda: False, (), self.loop, None) 201 202 self.loop._add_callback(h) 203 self.assertFalse(self.loop._scheduled) 204 self.assertIn(h, self.loop._ready) 205 206 def test__add_callback_cancelled_handle(self): 207 h = asyncio.Handle(lambda: False, (), self.loop, None) 208 h.cancel() 209 210 self.loop._add_callback(h) 211 self.assertFalse(self.loop._scheduled) 212 self.assertFalse(self.loop._ready) 213 214 def test_set_default_executor(self): 215 class DummyExecutor(concurrent.futures.ThreadPoolExecutor): 216 def submit(self, fn, *args, **kwargs): 217 raise NotImplementedError( 218 'cannot submit into a dummy executor') 219 220 self.loop._process_events = mock.Mock() 221 self.loop._write_to_self = mock.Mock() 222 223 executor = DummyExecutor() 224 self.loop.set_default_executor(executor) 225 self.assertIs(executor, self.loop._default_executor) 226 227 def test_set_default_executor_deprecation_warnings(self): 228 executor = mock.Mock() 229 230 with self.assertWarns(DeprecationWarning): 231 self.loop.set_default_executor(executor) 232 233 # Avoid cleaning up the executor mock 234 self.loop._default_executor = None 235 236 def test_call_soon(self): 237 def cb(): 238 pass 239 240 h = self.loop.call_soon(cb) 241 self.assertEqual(h._callback, cb) 242 self.assertIsInstance(h, asyncio.Handle) 243 self.assertIn(h, self.loop._ready) 244 245 def test_call_soon_non_callable(self): 246 self.loop.set_debug(True) 247 with self.assertRaisesRegex(TypeError, 'a callable object'): 248 self.loop.call_soon(1) 249 250 def test_call_later(self): 251 def cb(): 252 pass 253 254 h = self.loop.call_later(10.0, cb) 255 self.assertIsInstance(h, asyncio.TimerHandle) 256 self.assertIn(h, self.loop._scheduled) 257 self.assertNotIn(h, self.loop._ready) 258 259 def test_call_later_negative_delays(self): 260 calls = [] 261 262 def cb(arg): 263 calls.append(arg) 264 265 self.loop._process_events = mock.Mock() 266 self.loop.call_later(-1, cb, 'a') 267 self.loop.call_later(-2, cb, 'b') 268 test_utils.run_briefly(self.loop) 269 self.assertEqual(calls, ['b', 'a']) 270 271 def test_time_and_call_at(self): 272 def cb(): 273 self.loop.stop() 274 275 self.loop._process_events = mock.Mock() 276 delay = 0.1 277 278 when = self.loop.time() + delay 279 self.loop.call_at(when, cb) 280 t0 = self.loop.time() 281 self.loop.run_forever() 282 dt = self.loop.time() - t0 283 284 # 50 ms: maximum granularity of the event loop 285 self.assertGreaterEqual(dt, delay - 0.050, dt) 286 # tolerate a difference of +800 ms because some Python buildbots 287 # are really slow 288 self.assertLessEqual(dt, 0.9, dt) 289 290 def check_thread(self, loop, debug): 291 def cb(): 292 pass 293 294 loop.set_debug(debug) 295 if debug: 296 msg = ("Non-thread-safe operation invoked on an event loop other " 297 "than the current one") 298 with self.assertRaisesRegex(RuntimeError, msg): 299 loop.call_soon(cb) 300 with self.assertRaisesRegex(RuntimeError, msg): 301 loop.call_later(60, cb) 302 with self.assertRaisesRegex(RuntimeError, msg): 303 loop.call_at(loop.time() + 60, cb) 304 else: 305 loop.call_soon(cb) 306 loop.call_later(60, cb) 307 loop.call_at(loop.time() + 60, cb) 308 309 def test_check_thread(self): 310 def check_in_thread(loop, event, debug, create_loop, fut): 311 # wait until the event loop is running 312 event.wait() 313 314 try: 315 if create_loop: 316 loop2 = base_events.BaseEventLoop() 317 try: 318 asyncio.set_event_loop(loop2) 319 self.check_thread(loop, debug) 320 finally: 321 asyncio.set_event_loop(None) 322 loop2.close() 323 else: 324 self.check_thread(loop, debug) 325 except Exception as exc: 326 loop.call_soon_threadsafe(fut.set_exception, exc) 327 else: 328 loop.call_soon_threadsafe(fut.set_result, None) 329 330 def test_thread(loop, debug, create_loop=False): 331 event = threading.Event() 332 fut = loop.create_future() 333 loop.call_soon(event.set) 334 args = (loop, event, debug, create_loop, fut) 335 thread = threading.Thread(target=check_in_thread, args=args) 336 thread.start() 337 loop.run_until_complete(fut) 338 thread.join() 339 340 self.loop._process_events = mock.Mock() 341 self.loop._write_to_self = mock.Mock() 342 343 # raise RuntimeError if the thread has no event loop 344 test_thread(self.loop, True) 345 346 # check disabled if debug mode is disabled 347 test_thread(self.loop, False) 348 349 # raise RuntimeError if the event loop of the thread is not the called 350 # event loop 351 test_thread(self.loop, True, create_loop=True) 352 353 # check disabled if debug mode is disabled 354 test_thread(self.loop, False, create_loop=True) 355 356 def test__run_once(self): 357 h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (), 358 self.loop, None) 359 h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (), 360 self.loop, None) 361 362 h1.cancel() 363 364 self.loop._process_events = mock.Mock() 365 self.loop._scheduled.append(h1) 366 self.loop._scheduled.append(h2) 367 self.loop._run_once() 368 369 t = self.loop._selector.select.call_args[0][0] 370 self.assertTrue(9.5 < t < 10.5, t) 371 self.assertEqual([h2], self.loop._scheduled) 372 self.assertTrue(self.loop._process_events.called) 373 374 def test_set_debug(self): 375 self.loop.set_debug(True) 376 self.assertTrue(self.loop.get_debug()) 377 self.loop.set_debug(False) 378 self.assertFalse(self.loop.get_debug()) 379 380 def test__run_once_schedule_handle(self): 381 handle = None 382 processed = False 383 384 def cb(loop): 385 nonlocal processed, handle 386 processed = True 387 handle = loop.call_soon(lambda: True) 388 389 h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,), 390 self.loop, None) 391 392 self.loop._process_events = mock.Mock() 393 self.loop._scheduled.append(h) 394 self.loop._run_once() 395 396 self.assertTrue(processed) 397 self.assertEqual([handle], list(self.loop._ready)) 398 399 def test__run_once_cancelled_event_cleanup(self): 400 self.loop._process_events = mock.Mock() 401 402 self.assertTrue( 403 0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0) 404 405 def cb(): 406 pass 407 408 # Set up one "blocking" event that will not be cancelled to 409 # ensure later cancelled events do not make it to the head 410 # of the queue and get cleaned. 411 not_cancelled_count = 1 412 self.loop.call_later(3000, cb) 413 414 # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES) 415 # cancelled handles, ensure they aren't removed 416 417 cancelled_count = 2 418 for x in range(2): 419 h = self.loop.call_later(3600, cb) 420 h.cancel() 421 422 # Add some cancelled events that will be at head and removed 423 cancelled_count += 2 424 for x in range(2): 425 h = self.loop.call_later(100, cb) 426 h.cancel() 427 428 # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low 429 self.assertLessEqual(cancelled_count + not_cancelled_count, 430 base_events._MIN_SCHEDULED_TIMER_HANDLES) 431 432 self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) 433 434 self.loop._run_once() 435 436 cancelled_count -= 2 437 438 self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) 439 440 self.assertEqual(len(self.loop._scheduled), 441 cancelled_count + not_cancelled_count) 442 443 # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION 444 # so that deletion of cancelled events will occur on next _run_once 445 add_cancel_count = int(math.ceil( 446 base_events._MIN_SCHEDULED_TIMER_HANDLES * 447 base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1 448 449 add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES - 450 add_cancel_count, 0) 451 452 # Add some events that will not be cancelled 453 not_cancelled_count += add_not_cancel_count 454 for x in range(add_not_cancel_count): 455 self.loop.call_later(3600, cb) 456 457 # Add enough cancelled events 458 cancelled_count += add_cancel_count 459 for x in range(add_cancel_count): 460 h = self.loop.call_later(3600, cb) 461 h.cancel() 462 463 # Ensure all handles are still scheduled 464 self.assertEqual(len(self.loop._scheduled), 465 cancelled_count + not_cancelled_count) 466 467 self.loop._run_once() 468 469 # Ensure cancelled events were removed 470 self.assertEqual(len(self.loop._scheduled), not_cancelled_count) 471 472 # Ensure only uncancelled events remain scheduled 473 self.assertTrue(all([not x._cancelled for x in self.loop._scheduled])) 474 475 def test_run_until_complete_type_error(self): 476 self.assertRaises(TypeError, 477 self.loop.run_until_complete, 'blah') 478 479 def test_run_until_complete_loop(self): 480 task = self.loop.create_future() 481 other_loop = self.new_test_loop() 482 self.addCleanup(other_loop.close) 483 self.assertRaises(ValueError, 484 other_loop.run_until_complete, task) 485 486 def test_run_until_complete_loop_orphan_future_close_loop(self): 487 class ShowStopper(SystemExit): 488 pass 489 490 async def foo(delay): 491 await asyncio.sleep(delay) 492 493 def throw(): 494 raise ShowStopper 495 496 self.loop._process_events = mock.Mock() 497 self.loop.call_soon(throw) 498 with self.assertRaises(ShowStopper): 499 self.loop.run_until_complete(foo(0.1)) 500 501 # This call fails if run_until_complete does not clean up 502 # done-callback for the previous future. 503 self.loop.run_until_complete(foo(0.2)) 504 505 def test_subprocess_exec_invalid_args(self): 506 args = [sys.executable, '-c', 'pass'] 507 508 # missing program parameter (empty args) 509 self.assertRaises(TypeError, 510 self.loop.run_until_complete, self.loop.subprocess_exec, 511 asyncio.SubprocessProtocol) 512 513 # expected multiple arguments, not a list 514 self.assertRaises(TypeError, 515 self.loop.run_until_complete, self.loop.subprocess_exec, 516 asyncio.SubprocessProtocol, args) 517 518 # program arguments must be strings, not int 519 self.assertRaises(TypeError, 520 self.loop.run_until_complete, self.loop.subprocess_exec, 521 asyncio.SubprocessProtocol, sys.executable, 123) 522 523 # universal_newlines, shell, bufsize must not be set 524 self.assertRaises(TypeError, 525 self.loop.run_until_complete, self.loop.subprocess_exec, 526 asyncio.SubprocessProtocol, *args, universal_newlines=True) 527 self.assertRaises(TypeError, 528 self.loop.run_until_complete, self.loop.subprocess_exec, 529 asyncio.SubprocessProtocol, *args, shell=True) 530 self.assertRaises(TypeError, 531 self.loop.run_until_complete, self.loop.subprocess_exec, 532 asyncio.SubprocessProtocol, *args, bufsize=4096) 533 534 def test_subprocess_shell_invalid_args(self): 535 # expected a string, not an int or a list 536 self.assertRaises(TypeError, 537 self.loop.run_until_complete, self.loop.subprocess_shell, 538 asyncio.SubprocessProtocol, 123) 539 self.assertRaises(TypeError, 540 self.loop.run_until_complete, self.loop.subprocess_shell, 541 asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass']) 542 543 # universal_newlines, shell, bufsize must not be set 544 self.assertRaises(TypeError, 545 self.loop.run_until_complete, self.loop.subprocess_shell, 546 asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True) 547 self.assertRaises(TypeError, 548 self.loop.run_until_complete, self.loop.subprocess_shell, 549 asyncio.SubprocessProtocol, 'exit 0', shell=True) 550 self.assertRaises(TypeError, 551 self.loop.run_until_complete, self.loop.subprocess_shell, 552 asyncio.SubprocessProtocol, 'exit 0', bufsize=4096) 553 554 def test_default_exc_handler_callback(self): 555 self.loop._process_events = mock.Mock() 556 557 def zero_error(fut): 558 fut.set_result(True) 559 1/0 560 561 # Test call_soon (events.Handle) 562 with mock.patch('asyncio.base_events.logger') as log: 563 fut = self.loop.create_future() 564 self.loop.call_soon(zero_error, fut) 565 fut.add_done_callback(lambda fut: self.loop.stop()) 566 self.loop.run_forever() 567 log.error.assert_called_with( 568 test_utils.MockPattern('Exception in callback.*zero'), 569 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 570 571 # Test call_later (events.TimerHandle) 572 with mock.patch('asyncio.base_events.logger') as log: 573 fut = self.loop.create_future() 574 self.loop.call_later(0.01, zero_error, fut) 575 fut.add_done_callback(lambda fut: self.loop.stop()) 576 self.loop.run_forever() 577 log.error.assert_called_with( 578 test_utils.MockPattern('Exception in callback.*zero'), 579 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 580 581 def test_default_exc_handler_coro(self): 582 self.loop._process_events = mock.Mock() 583 584 async def zero_error_coro(): 585 await asyncio.sleep(0.01) 586 1/0 587 588 # Test Future.__del__ 589 with mock.patch('asyncio.base_events.logger') as log: 590 fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop) 591 fut.add_done_callback(lambda *args: self.loop.stop()) 592 self.loop.run_forever() 593 fut = None # Trigger Future.__del__ or futures._TracebackLogger 594 support.gc_collect() 595 if PY34: 596 # Future.__del__ in Python 3.4 logs error with 597 # an actual exception context 598 log.error.assert_called_with( 599 test_utils.MockPattern('.*exception was never retrieved'), 600 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 601 else: 602 # futures._TracebackLogger logs only textual traceback 603 log.error.assert_called_with( 604 test_utils.MockPattern( 605 '.*exception was never retrieved.*ZeroDiv'), 606 exc_info=False) 607 608 def test_set_exc_handler_invalid(self): 609 with self.assertRaisesRegex(TypeError, 'A callable object or None'): 610 self.loop.set_exception_handler('spam') 611 612 def test_set_exc_handler_custom(self): 613 def zero_error(): 614 1/0 615 616 def run_loop(): 617 handle = self.loop.call_soon(zero_error) 618 self.loop._run_once() 619 return handle 620 621 self.loop.set_debug(True) 622 self.loop._process_events = mock.Mock() 623 624 self.assertIsNone(self.loop.get_exception_handler()) 625 mock_handler = mock.Mock() 626 self.loop.set_exception_handler(mock_handler) 627 self.assertIs(self.loop.get_exception_handler(), mock_handler) 628 handle = run_loop() 629 mock_handler.assert_called_with(self.loop, { 630 'exception': MOCK_ANY, 631 'message': test_utils.MockPattern( 632 'Exception in callback.*zero_error'), 633 'handle': handle, 634 'source_traceback': handle._source_traceback, 635 }) 636 mock_handler.reset_mock() 637 638 self.loop.set_exception_handler(None) 639 with mock.patch('asyncio.base_events.logger') as log: 640 run_loop() 641 log.error.assert_called_with( 642 test_utils.MockPattern( 643 'Exception in callback.*zero'), 644 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 645 646 self.assertFalse(mock_handler.called) 647 648 def test_set_exc_handler_broken(self): 649 def run_loop(): 650 def zero_error(): 651 1/0 652 self.loop.call_soon(zero_error) 653 self.loop._run_once() 654 655 def handler(loop, context): 656 raise AttributeError('spam') 657 658 self.loop._process_events = mock.Mock() 659 660 self.loop.set_exception_handler(handler) 661 662 with mock.patch('asyncio.base_events.logger') as log: 663 run_loop() 664 log.error.assert_called_with( 665 test_utils.MockPattern( 666 'Unhandled error in exception handler'), 667 exc_info=(AttributeError, MOCK_ANY, MOCK_ANY)) 668 669 def test_default_exc_handler_broken(self): 670 _context = None 671 672 class Loop(base_events.BaseEventLoop): 673 674 _selector = mock.Mock() 675 _process_events = mock.Mock() 676 677 def default_exception_handler(self, context): 678 nonlocal _context 679 _context = context 680 # Simulates custom buggy "default_exception_handler" 681 raise ValueError('spam') 682 683 loop = Loop() 684 self.addCleanup(loop.close) 685 asyncio.set_event_loop(loop) 686 687 def run_loop(): 688 def zero_error(): 689 1/0 690 loop.call_soon(zero_error) 691 loop._run_once() 692 693 with mock.patch('asyncio.base_events.logger') as log: 694 run_loop() 695 log.error.assert_called_with( 696 'Exception in default exception handler', 697 exc_info=True) 698 699 def custom_handler(loop, context): 700 raise ValueError('ham') 701 702 _context = None 703 loop.set_exception_handler(custom_handler) 704 with mock.patch('asyncio.base_events.logger') as log: 705 run_loop() 706 log.error.assert_called_with( 707 test_utils.MockPattern('Exception in default exception.*' 708 'while handling.*in custom'), 709 exc_info=True) 710 711 # Check that original context was passed to default 712 # exception handler. 713 self.assertIn('context', _context) 714 self.assertIs(type(_context['context']['exception']), 715 ZeroDivisionError) 716 717 def test_set_task_factory_invalid(self): 718 with self.assertRaisesRegex( 719 TypeError, 'task factory must be a callable or None'): 720 721 self.loop.set_task_factory(1) 722 723 self.assertIsNone(self.loop.get_task_factory()) 724 725 def test_set_task_factory(self): 726 self.loop._process_events = mock.Mock() 727 728 class MyTask(asyncio.Task): 729 pass 730 731 async def coro(): 732 pass 733 734 factory = lambda loop, coro: MyTask(coro, loop=loop) 735 736 self.assertIsNone(self.loop.get_task_factory()) 737 self.loop.set_task_factory(factory) 738 self.assertIs(self.loop.get_task_factory(), factory) 739 740 task = self.loop.create_task(coro()) 741 self.assertTrue(isinstance(task, MyTask)) 742 self.loop.run_until_complete(task) 743 744 self.loop.set_task_factory(None) 745 self.assertIsNone(self.loop.get_task_factory()) 746 747 task = self.loop.create_task(coro()) 748 self.assertTrue(isinstance(task, asyncio.Task)) 749 self.assertFalse(isinstance(task, MyTask)) 750 self.loop.run_until_complete(task) 751 752 def test_env_var_debug(self): 753 code = '\n'.join(( 754 'import asyncio', 755 'loop = asyncio.get_event_loop()', 756 'print(loop.get_debug())')) 757 758 # Test with -E to not fail if the unit test was run with 759 # PYTHONASYNCIODEBUG set to a non-empty string 760 sts, stdout, stderr = assert_python_ok('-E', '-c', code) 761 self.assertEqual(stdout.rstrip(), b'False') 762 763 sts, stdout, stderr = assert_python_ok('-c', code, 764 PYTHONASYNCIODEBUG='', 765 PYTHONDEVMODE='') 766 self.assertEqual(stdout.rstrip(), b'False') 767 768 sts, stdout, stderr = assert_python_ok('-c', code, 769 PYTHONASYNCIODEBUG='1', 770 PYTHONDEVMODE='') 771 self.assertEqual(stdout.rstrip(), b'True') 772 773 sts, stdout, stderr = assert_python_ok('-E', '-c', code, 774 PYTHONASYNCIODEBUG='1') 775 self.assertEqual(stdout.rstrip(), b'False') 776 777 # -X dev 778 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 779 '-c', code) 780 self.assertEqual(stdout.rstrip(), b'True') 781 782 def test_create_task(self): 783 class MyTask(asyncio.Task): 784 pass 785 786 async def test(): 787 pass 788 789 class EventLoop(base_events.BaseEventLoop): 790 def create_task(self, coro): 791 return MyTask(coro, loop=loop) 792 793 loop = EventLoop() 794 self.set_event_loop(loop) 795 796 coro = test() 797 task = asyncio.ensure_future(coro, loop=loop) 798 self.assertIsInstance(task, MyTask) 799 800 # make warnings quiet 801 task._log_destroy_pending = False 802 coro.close() 803 804 def test_create_named_task_with_default_factory(self): 805 async def test(): 806 pass 807 808 loop = asyncio.new_event_loop() 809 task = loop.create_task(test(), name='test_task') 810 try: 811 self.assertEqual(task.get_name(), 'test_task') 812 finally: 813 loop.run_until_complete(task) 814 loop.close() 815 816 def test_create_named_task_with_custom_factory(self): 817 def task_factory(loop, coro): 818 return asyncio.Task(coro, loop=loop) 819 820 async def test(): 821 pass 822 823 loop = asyncio.new_event_loop() 824 loop.set_task_factory(task_factory) 825 task = loop.create_task(test(), name='test_task') 826 try: 827 self.assertEqual(task.get_name(), 'test_task') 828 finally: 829 loop.run_until_complete(task) 830 loop.close() 831 832 def test_run_forever_keyboard_interrupt(self): 833 # Python issue #22601: ensure that the temporary task created by 834 # run_forever() consumes the KeyboardInterrupt and so don't log 835 # a warning 836 async def raise_keyboard_interrupt(): 837 raise KeyboardInterrupt 838 839 self.loop._process_events = mock.Mock() 840 self.loop.call_exception_handler = mock.Mock() 841 842 try: 843 self.loop.run_until_complete(raise_keyboard_interrupt()) 844 except KeyboardInterrupt: 845 pass 846 self.loop.close() 847 support.gc_collect() 848 849 self.assertFalse(self.loop.call_exception_handler.called) 850 851 def test_run_until_complete_baseexception(self): 852 # Python issue #22429: run_until_complete() must not schedule a pending 853 # call to stop() if the future raised a BaseException 854 async def raise_keyboard_interrupt(): 855 raise KeyboardInterrupt 856 857 self.loop._process_events = mock.Mock() 858 859 try: 860 self.loop.run_until_complete(raise_keyboard_interrupt()) 861 except KeyboardInterrupt: 862 pass 863 864 def func(): 865 self.loop.stop() 866 func.called = True 867 func.called = False 868 try: 869 self.loop.call_soon(func) 870 self.loop.run_forever() 871 except KeyboardInterrupt: 872 pass 873 self.assertTrue(func.called) 874 875 def test_single_selecter_event_callback_after_stopping(self): 876 # Python issue #25593: A stopped event loop may cause event callbacks 877 # to run more than once. 878 event_sentinel = object() 879 callcount = 0 880 doer = None 881 882 def proc_events(event_list): 883 nonlocal doer 884 if event_sentinel in event_list: 885 doer = self.loop.call_soon(do_event) 886 887 def do_event(): 888 nonlocal callcount 889 callcount += 1 890 self.loop.call_soon(clear_selector) 891 892 def clear_selector(): 893 doer.cancel() 894 self.loop._selector.select.return_value = () 895 896 self.loop._process_events = proc_events 897 self.loop._selector.select.return_value = (event_sentinel,) 898 899 for i in range(1, 3): 900 with self.subTest('Loop %d/2' % i): 901 self.loop.call_soon(self.loop.stop) 902 self.loop.run_forever() 903 self.assertEqual(callcount, 1) 904 905 def test_run_once(self): 906 # Simple test for test_utils.run_once(). It may seem strange 907 # to have a test for this (the function isn't even used!) but 908 # it's a de-factor standard API for library tests. This tests 909 # the idiom: loop.call_soon(loop.stop); loop.run_forever(). 910 count = 0 911 912 def callback(): 913 nonlocal count 914 count += 1 915 916 self.loop._process_events = mock.Mock() 917 self.loop.call_soon(callback) 918 test_utils.run_once(self.loop) 919 self.assertEqual(count, 1) 920 921 def test_run_forever_pre_stopped(self): 922 # Test that the old idiom for pre-stopping the loop works. 923 self.loop._process_events = mock.Mock() 924 self.loop.stop() 925 self.loop.run_forever() 926 self.loop._selector.select.assert_called_once_with(0) 927 928 async def leave_unfinalized_asyncgen(self): 929 # Create an async generator, iterate it partially, and leave it 930 # to be garbage collected. 931 # Used in async generator finalization tests. 932 # Depends on implementation details of garbage collector. Changes 933 # in gc may break this function. 934 status = {'started': False, 935 'stopped': False, 936 'finalized': False} 937 938 async def agen(): 939 status['started'] = True 940 try: 941 for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']: 942 yield item 943 finally: 944 status['finalized'] = True 945 946 ag = agen() 947 ai = ag.__aiter__() 948 949 async def iter_one(): 950 try: 951 item = await ai.__anext__() 952 except StopAsyncIteration: 953 return 954 if item == 'THREE': 955 status['stopped'] = True 956 return 957 asyncio.create_task(iter_one()) 958 959 asyncio.create_task(iter_one()) 960 return status 961 962 def test_asyncgen_finalization_by_gc(self): 963 # Async generators should be finalized when garbage collected. 964 self.loop._process_events = mock.Mock() 965 self.loop._write_to_self = mock.Mock() 966 with support.disable_gc(): 967 status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) 968 while not status['stopped']: 969 test_utils.run_briefly(self.loop) 970 self.assertTrue(status['started']) 971 self.assertTrue(status['stopped']) 972 self.assertFalse(status['finalized']) 973 support.gc_collect() 974 test_utils.run_briefly(self.loop) 975 self.assertTrue(status['finalized']) 976 977 def test_asyncgen_finalization_by_gc_in_other_thread(self): 978 # Python issue 34769: If garbage collector runs in another 979 # thread, async generators will not finalize in debug 980 # mode. 981 self.loop._process_events = mock.Mock() 982 self.loop._write_to_self = mock.Mock() 983 self.loop.set_debug(True) 984 with support.disable_gc(): 985 status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) 986 while not status['stopped']: 987 test_utils.run_briefly(self.loop) 988 self.assertTrue(status['started']) 989 self.assertTrue(status['stopped']) 990 self.assertFalse(status['finalized']) 991 self.loop.run_until_complete( 992 self.loop.run_in_executor(None, support.gc_collect)) 993 test_utils.run_briefly(self.loop) 994 self.assertTrue(status['finalized']) 995 996 997class MyProto(asyncio.Protocol): 998 done = None 999 1000 def __init__(self, create_future=False): 1001 self.state = 'INITIAL' 1002 self.nbytes = 0 1003 if create_future: 1004 self.done = asyncio.get_running_loop().create_future() 1005 1006 def _assert_state(self, *expected): 1007 if self.state not in expected: 1008 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}') 1009 1010 def connection_made(self, transport): 1011 self.transport = transport 1012 self._assert_state('INITIAL') 1013 self.state = 'CONNECTED' 1014 transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n') 1015 1016 def data_received(self, data): 1017 self._assert_state('CONNECTED') 1018 self.nbytes += len(data) 1019 1020 def eof_received(self): 1021 self._assert_state('CONNECTED') 1022 self.state = 'EOF' 1023 1024 def connection_lost(self, exc): 1025 self._assert_state('CONNECTED', 'EOF') 1026 self.state = 'CLOSED' 1027 if self.done: 1028 self.done.set_result(None) 1029 1030 1031class MyDatagramProto(asyncio.DatagramProtocol): 1032 done = None 1033 1034 def __init__(self, create_future=False, loop=None): 1035 self.state = 'INITIAL' 1036 self.nbytes = 0 1037 if create_future: 1038 self.done = loop.create_future() 1039 1040 def _assert_state(self, expected): 1041 if self.state != expected: 1042 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}') 1043 1044 def connection_made(self, transport): 1045 self.transport = transport 1046 self._assert_state('INITIAL') 1047 self.state = 'INITIALIZED' 1048 1049 def datagram_received(self, data, addr): 1050 self._assert_state('INITIALIZED') 1051 self.nbytes += len(data) 1052 1053 def error_received(self, exc): 1054 self._assert_state('INITIALIZED') 1055 1056 def connection_lost(self, exc): 1057 self._assert_state('INITIALIZED') 1058 self.state = 'CLOSED' 1059 if self.done: 1060 self.done.set_result(None) 1061 1062 1063class BaseEventLoopWithSelectorTests(test_utils.TestCase): 1064 1065 def setUp(self): 1066 super().setUp() 1067 self.loop = asyncio.SelectorEventLoop() 1068 self.set_event_loop(self.loop) 1069 1070 @mock.patch('socket.getnameinfo') 1071 def test_getnameinfo(self, m_gai): 1072 m_gai.side_effect = lambda *args: 42 1073 r = self.loop.run_until_complete(self.loop.getnameinfo(('abc', 123))) 1074 self.assertEqual(r, 42) 1075 1076 @patch_socket 1077 def test_create_connection_multiple_errors(self, m_socket): 1078 1079 class MyProto(asyncio.Protocol): 1080 pass 1081 1082 async def getaddrinfo(*args, **kw): 1083 return [(2, 1, 6, '', ('107.6.106.82', 80)), 1084 (2, 1, 6, '', ('107.6.106.82', 80))] 1085 1086 def getaddrinfo_task(*args, **kwds): 1087 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1088 1089 idx = -1 1090 errors = ['err1', 'err2'] 1091 1092 def _socket(*args, **kw): 1093 nonlocal idx, errors 1094 idx += 1 1095 raise OSError(errors[idx]) 1096 1097 m_socket.socket = _socket 1098 1099 self.loop.getaddrinfo = getaddrinfo_task 1100 1101 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1102 with self.assertRaises(OSError) as cm: 1103 self.loop.run_until_complete(coro) 1104 1105 self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2') 1106 1107 @patch_socket 1108 def test_create_connection_timeout(self, m_socket): 1109 # Ensure that the socket is closed on timeout 1110 sock = mock.Mock() 1111 m_socket.socket.return_value = sock 1112 1113 def getaddrinfo(*args, **kw): 1114 fut = self.loop.create_future() 1115 addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '', 1116 ('127.0.0.1', 80)) 1117 fut.set_result([addr]) 1118 return fut 1119 self.loop.getaddrinfo = getaddrinfo 1120 1121 with mock.patch.object(self.loop, 'sock_connect', 1122 side_effect=asyncio.TimeoutError): 1123 coro = self.loop.create_connection(MyProto, '127.0.0.1', 80) 1124 with self.assertRaises(asyncio.TimeoutError): 1125 self.loop.run_until_complete(coro) 1126 self.assertTrue(sock.close.called) 1127 1128 def test_create_connection_host_port_sock(self): 1129 coro = self.loop.create_connection( 1130 MyProto, 'example.com', 80, sock=object()) 1131 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1132 1133 def test_create_connection_wrong_sock(self): 1134 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1135 with sock: 1136 coro = self.loop.create_connection(MyProto, sock=sock) 1137 with self.assertRaisesRegex(ValueError, 1138 'A Stream Socket was expected'): 1139 self.loop.run_until_complete(coro) 1140 1141 def test_create_server_wrong_sock(self): 1142 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1143 with sock: 1144 coro = self.loop.create_server(MyProto, sock=sock) 1145 with self.assertRaisesRegex(ValueError, 1146 'A Stream Socket was expected'): 1147 self.loop.run_until_complete(coro) 1148 1149 def test_create_server_ssl_timeout_for_plain_socket(self): 1150 coro = self.loop.create_server( 1151 MyProto, 'example.com', 80, ssl_handshake_timeout=1) 1152 with self.assertRaisesRegex( 1153 ValueError, 1154 'ssl_handshake_timeout is only meaningful with ssl'): 1155 self.loop.run_until_complete(coro) 1156 1157 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 1158 'no socket.SOCK_NONBLOCK (linux only)') 1159 def test_create_server_stream_bittype(self): 1160 sock = socket.socket( 1161 socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK) 1162 with sock: 1163 coro = self.loop.create_server(lambda: None, sock=sock) 1164 srv = self.loop.run_until_complete(coro) 1165 srv.close() 1166 self.loop.run_until_complete(srv.wait_closed()) 1167 1168 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support') 1169 def test_create_server_ipv6(self): 1170 async def main(): 1171 srv = await asyncio.start_server(lambda: None, '::1', 0) 1172 try: 1173 self.assertGreater(len(srv.sockets), 0) 1174 finally: 1175 srv.close() 1176 await srv.wait_closed() 1177 1178 try: 1179 self.loop.run_until_complete(main()) 1180 except OSError as ex: 1181 if (hasattr(errno, 'EADDRNOTAVAIL') and 1182 ex.errno == errno.EADDRNOTAVAIL): 1183 self.skipTest('failed to bind to ::1') 1184 else: 1185 raise 1186 1187 def test_create_datagram_endpoint_wrong_sock(self): 1188 sock = socket.socket(socket.AF_INET) 1189 with sock: 1190 coro = self.loop.create_datagram_endpoint(MyProto, sock=sock) 1191 with self.assertRaisesRegex(ValueError, 1192 'A UDP Socket was expected'): 1193 self.loop.run_until_complete(coro) 1194 1195 def test_create_connection_no_host_port_sock(self): 1196 coro = self.loop.create_connection(MyProto) 1197 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1198 1199 def test_create_connection_no_getaddrinfo(self): 1200 async def getaddrinfo(*args, **kw): 1201 return [] 1202 1203 def getaddrinfo_task(*args, **kwds): 1204 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1205 1206 self.loop.getaddrinfo = getaddrinfo_task 1207 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1208 self.assertRaises( 1209 OSError, self.loop.run_until_complete, coro) 1210 1211 def test_create_connection_connect_err(self): 1212 async def getaddrinfo(*args, **kw): 1213 return [(2, 1, 6, '', ('107.6.106.82', 80))] 1214 1215 def getaddrinfo_task(*args, **kwds): 1216 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1217 1218 self.loop.getaddrinfo = getaddrinfo_task 1219 self.loop.sock_connect = mock.Mock() 1220 self.loop.sock_connect.side_effect = OSError 1221 1222 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1223 self.assertRaises( 1224 OSError, self.loop.run_until_complete, coro) 1225 1226 def test_create_connection_multiple(self): 1227 async def getaddrinfo(*args, **kw): 1228 return [(2, 1, 6, '', ('0.0.0.1', 80)), 1229 (2, 1, 6, '', ('0.0.0.2', 80))] 1230 1231 def getaddrinfo_task(*args, **kwds): 1232 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1233 1234 self.loop.getaddrinfo = getaddrinfo_task 1235 self.loop.sock_connect = mock.Mock() 1236 self.loop.sock_connect.side_effect = OSError 1237 1238 coro = self.loop.create_connection( 1239 MyProto, 'example.com', 80, family=socket.AF_INET) 1240 with self.assertRaises(OSError): 1241 self.loop.run_until_complete(coro) 1242 1243 @patch_socket 1244 def test_create_connection_multiple_errors_local_addr(self, m_socket): 1245 1246 def bind(addr): 1247 if addr[0] == '0.0.0.1': 1248 err = OSError('Err') 1249 err.strerror = 'Err' 1250 raise err 1251 1252 m_socket.socket.return_value.bind = bind 1253 1254 async def getaddrinfo(*args, **kw): 1255 return [(2, 1, 6, '', ('0.0.0.1', 80)), 1256 (2, 1, 6, '', ('0.0.0.2', 80))] 1257 1258 def getaddrinfo_task(*args, **kwds): 1259 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1260 1261 self.loop.getaddrinfo = getaddrinfo_task 1262 self.loop.sock_connect = mock.Mock() 1263 self.loop.sock_connect.side_effect = OSError('Err2') 1264 1265 coro = self.loop.create_connection( 1266 MyProto, 'example.com', 80, family=socket.AF_INET, 1267 local_addr=(None, 8080)) 1268 with self.assertRaises(OSError) as cm: 1269 self.loop.run_until_complete(coro) 1270 1271 self.assertTrue(str(cm.exception).startswith('Multiple exceptions: ')) 1272 self.assertTrue(m_socket.socket.return_value.close.called) 1273 1274 def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton): 1275 # Test the fallback code, even if this system has inet_pton. 1276 if not allow_inet_pton: 1277 del m_socket.inet_pton 1278 1279 m_socket.getaddrinfo = socket.getaddrinfo 1280 sock = m_socket.socket.return_value 1281 1282 self.loop._add_reader = mock.Mock() 1283 self.loop._add_reader._is_coroutine = False 1284 self.loop._add_writer = mock.Mock() 1285 self.loop._add_writer._is_coroutine = False 1286 1287 coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80) 1288 t, p = self.loop.run_until_complete(coro) 1289 try: 1290 sock.connect.assert_called_with(('1.2.3.4', 80)) 1291 _, kwargs = m_socket.socket.call_args 1292 self.assertEqual(kwargs['family'], m_socket.AF_INET) 1293 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1294 finally: 1295 t.close() 1296 test_utils.run_briefly(self.loop) # allow transport to close 1297 1298 if socket_helper.IPV6_ENABLED: 1299 sock.family = socket.AF_INET6 1300 coro = self.loop.create_connection(asyncio.Protocol, '::1', 80) 1301 t, p = self.loop.run_until_complete(coro) 1302 try: 1303 # Without inet_pton we use getaddrinfo, which transforms 1304 # ('::1', 80) to ('::1', 80, 0, 0). The last 0s are flow info, 1305 # scope id. 1306 [address] = sock.connect.call_args[0] 1307 host, port = address[:2] 1308 self.assertRegex(host, r'::(0\.)*1') 1309 self.assertEqual(port, 80) 1310 _, kwargs = m_socket.socket.call_args 1311 self.assertEqual(kwargs['family'], m_socket.AF_INET6) 1312 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1313 finally: 1314 t.close() 1315 test_utils.run_briefly(self.loop) # allow transport to close 1316 1317 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support') 1318 @unittest.skipIf(sys.platform.startswith('aix'), 1319 "bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX") 1320 @patch_socket 1321 def test_create_connection_ipv6_scope(self, m_socket): 1322 m_socket.getaddrinfo = socket.getaddrinfo 1323 sock = m_socket.socket.return_value 1324 sock.family = socket.AF_INET6 1325 1326 self.loop._add_reader = mock.Mock() 1327 self.loop._add_reader._is_coroutine = False 1328 self.loop._add_writer = mock.Mock() 1329 self.loop._add_writer._is_coroutine = False 1330 1331 coro = self.loop.create_connection(asyncio.Protocol, 'fe80::1%1', 80) 1332 t, p = self.loop.run_until_complete(coro) 1333 try: 1334 sock.connect.assert_called_with(('fe80::1', 80, 0, 1)) 1335 _, kwargs = m_socket.socket.call_args 1336 self.assertEqual(kwargs['family'], m_socket.AF_INET6) 1337 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1338 finally: 1339 t.close() 1340 test_utils.run_briefly(self.loop) # allow transport to close 1341 1342 @patch_socket 1343 def test_create_connection_ip_addr(self, m_socket): 1344 self._test_create_connection_ip_addr(m_socket, True) 1345 1346 @patch_socket 1347 def test_create_connection_no_inet_pton(self, m_socket): 1348 self._test_create_connection_ip_addr(m_socket, False) 1349 1350 @patch_socket 1351 def test_create_connection_service_name(self, m_socket): 1352 m_socket.getaddrinfo = socket.getaddrinfo 1353 sock = m_socket.socket.return_value 1354 1355 self.loop._add_reader = mock.Mock() 1356 self.loop._add_reader._is_coroutine = False 1357 self.loop._add_writer = mock.Mock() 1358 self.loop._add_writer._is_coroutine = False 1359 1360 for service, port in ('http', 80), (b'http', 80): 1361 coro = self.loop.create_connection(asyncio.Protocol, 1362 '127.0.0.1', service) 1363 1364 t, p = self.loop.run_until_complete(coro) 1365 try: 1366 sock.connect.assert_called_with(('127.0.0.1', port)) 1367 _, kwargs = m_socket.socket.call_args 1368 self.assertEqual(kwargs['family'], m_socket.AF_INET) 1369 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1370 finally: 1371 t.close() 1372 test_utils.run_briefly(self.loop) # allow transport to close 1373 1374 for service in 'nonsense', b'nonsense': 1375 coro = self.loop.create_connection(asyncio.Protocol, 1376 '127.0.0.1', service) 1377 1378 with self.assertRaises(OSError): 1379 self.loop.run_until_complete(coro) 1380 1381 def test_create_connection_no_local_addr(self): 1382 async def getaddrinfo(host, *args, **kw): 1383 if host == 'example.com': 1384 return [(2, 1, 6, '', ('107.6.106.82', 80)), 1385 (2, 1, 6, '', ('107.6.106.82', 80))] 1386 else: 1387 return [] 1388 1389 def getaddrinfo_task(*args, **kwds): 1390 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1391 self.loop.getaddrinfo = getaddrinfo_task 1392 1393 coro = self.loop.create_connection( 1394 MyProto, 'example.com', 80, family=socket.AF_INET, 1395 local_addr=(None, 8080)) 1396 self.assertRaises( 1397 OSError, self.loop.run_until_complete, coro) 1398 1399 @patch_socket 1400 def test_create_connection_bluetooth(self, m_socket): 1401 # See http://bugs.python.org/issue27136, fallback to getaddrinfo when 1402 # we can't recognize an address is resolved, e.g. a Bluetooth address. 1403 addr = ('00:01:02:03:04:05', 1) 1404 1405 def getaddrinfo(host, port, *args, **kw): 1406 self.assertEqual((host, port), addr) 1407 return [(999, 1, 999, '', (addr, 1))] 1408 1409 m_socket.getaddrinfo = getaddrinfo 1410 sock = m_socket.socket() 1411 coro = self.loop.sock_connect(sock, addr) 1412 self.loop.run_until_complete(coro) 1413 1414 def test_create_connection_ssl_server_hostname_default(self): 1415 self.loop.getaddrinfo = mock.Mock() 1416 1417 def mock_getaddrinfo(*args, **kwds): 1418 f = self.loop.create_future() 1419 f.set_result([(socket.AF_INET, socket.SOCK_STREAM, 1420 socket.SOL_TCP, '', ('1.2.3.4', 80))]) 1421 return f 1422 1423 self.loop.getaddrinfo.side_effect = mock_getaddrinfo 1424 self.loop.sock_connect = mock.Mock() 1425 self.loop.sock_connect.return_value = self.loop.create_future() 1426 self.loop.sock_connect.return_value.set_result(None) 1427 self.loop._make_ssl_transport = mock.Mock() 1428 1429 class _SelectorTransportMock: 1430 _sock = None 1431 1432 def get_extra_info(self, key): 1433 return mock.Mock() 1434 1435 def close(self): 1436 self._sock.close() 1437 1438 def mock_make_ssl_transport(sock, protocol, sslcontext, waiter, 1439 **kwds): 1440 waiter.set_result(None) 1441 transport = _SelectorTransportMock() 1442 transport._sock = sock 1443 return transport 1444 1445 self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport 1446 ANY = mock.ANY 1447 handshake_timeout = object() 1448 # First try the default server_hostname. 1449 self.loop._make_ssl_transport.reset_mock() 1450 coro = self.loop.create_connection( 1451 MyProto, 'python.org', 80, ssl=True, 1452 ssl_handshake_timeout=handshake_timeout) 1453 transport, _ = self.loop.run_until_complete(coro) 1454 transport.close() 1455 self.loop._make_ssl_transport.assert_called_with( 1456 ANY, ANY, ANY, ANY, 1457 server_side=False, 1458 server_hostname='python.org', 1459 ssl_handshake_timeout=handshake_timeout) 1460 # Next try an explicit server_hostname. 1461 self.loop._make_ssl_transport.reset_mock() 1462 coro = self.loop.create_connection( 1463 MyProto, 'python.org', 80, ssl=True, 1464 server_hostname='perl.com', 1465 ssl_handshake_timeout=handshake_timeout) 1466 transport, _ = self.loop.run_until_complete(coro) 1467 transport.close() 1468 self.loop._make_ssl_transport.assert_called_with( 1469 ANY, ANY, ANY, ANY, 1470 server_side=False, 1471 server_hostname='perl.com', 1472 ssl_handshake_timeout=handshake_timeout) 1473 # Finally try an explicit empty server_hostname. 1474 self.loop._make_ssl_transport.reset_mock() 1475 coro = self.loop.create_connection( 1476 MyProto, 'python.org', 80, ssl=True, 1477 server_hostname='', 1478 ssl_handshake_timeout=handshake_timeout) 1479 transport, _ = self.loop.run_until_complete(coro) 1480 transport.close() 1481 self.loop._make_ssl_transport.assert_called_with( 1482 ANY, ANY, ANY, ANY, 1483 server_side=False, 1484 server_hostname='', 1485 ssl_handshake_timeout=handshake_timeout) 1486 1487 def test_create_connection_no_ssl_server_hostname_errors(self): 1488 # When not using ssl, server_hostname must be None. 1489 coro = self.loop.create_connection(MyProto, 'python.org', 80, 1490 server_hostname='') 1491 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1492 coro = self.loop.create_connection(MyProto, 'python.org', 80, 1493 server_hostname='python.org') 1494 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1495 1496 def test_create_connection_ssl_server_hostname_errors(self): 1497 # When using ssl, server_hostname may be None if host is non-empty. 1498 coro = self.loop.create_connection(MyProto, '', 80, ssl=True) 1499 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1500 coro = self.loop.create_connection(MyProto, None, 80, ssl=True) 1501 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1502 sock = socket.socket() 1503 coro = self.loop.create_connection(MyProto, None, None, 1504 ssl=True, sock=sock) 1505 self.addCleanup(sock.close) 1506 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1507 1508 def test_create_connection_ssl_timeout_for_plain_socket(self): 1509 coro = self.loop.create_connection( 1510 MyProto, 'example.com', 80, ssl_handshake_timeout=1) 1511 with self.assertRaisesRegex( 1512 ValueError, 1513 'ssl_handshake_timeout is only meaningful with ssl'): 1514 self.loop.run_until_complete(coro) 1515 1516 def test_create_server_empty_host(self): 1517 # if host is empty string use None instead 1518 host = object() 1519 1520 async def getaddrinfo(*args, **kw): 1521 nonlocal host 1522 host = args[0] 1523 return [] 1524 1525 def getaddrinfo_task(*args, **kwds): 1526 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1527 1528 self.loop.getaddrinfo = getaddrinfo_task 1529 fut = self.loop.create_server(MyProto, '', 0) 1530 self.assertRaises(OSError, self.loop.run_until_complete, fut) 1531 self.assertIsNone(host) 1532 1533 def test_create_server_host_port_sock(self): 1534 fut = self.loop.create_server( 1535 MyProto, '0.0.0.0', 0, sock=object()) 1536 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1537 1538 def test_create_server_no_host_port_sock(self): 1539 fut = self.loop.create_server(MyProto) 1540 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1541 1542 def test_create_server_no_getaddrinfo(self): 1543 getaddrinfo = self.loop.getaddrinfo = mock.Mock() 1544 getaddrinfo.return_value = self.loop.create_future() 1545 getaddrinfo.return_value.set_result(None) 1546 1547 f = self.loop.create_server(MyProto, 'python.org', 0) 1548 self.assertRaises(OSError, self.loop.run_until_complete, f) 1549 1550 @patch_socket 1551 def test_create_server_nosoreuseport(self, m_socket): 1552 m_socket.getaddrinfo = socket.getaddrinfo 1553 del m_socket.SO_REUSEPORT 1554 m_socket.socket.return_value = mock.Mock() 1555 1556 f = self.loop.create_server( 1557 MyProto, '0.0.0.0', 0, reuse_port=True) 1558 1559 self.assertRaises(ValueError, self.loop.run_until_complete, f) 1560 1561 @patch_socket 1562 def test_create_server_soreuseport_only_defined(self, m_socket): 1563 m_socket.getaddrinfo = socket.getaddrinfo 1564 m_socket.socket.return_value = mock.Mock() 1565 m_socket.SO_REUSEPORT = -1 1566 1567 f = self.loop.create_server( 1568 MyProto, '0.0.0.0', 0, reuse_port=True) 1569 1570 self.assertRaises(ValueError, self.loop.run_until_complete, f) 1571 1572 @patch_socket 1573 def test_create_server_cant_bind(self, m_socket): 1574 1575 class Err(OSError): 1576 strerror = 'error' 1577 1578 m_socket.getaddrinfo.return_value = [ 1579 (2, 1, 6, '', ('127.0.0.1', 10100))] 1580 m_socket.getaddrinfo._is_coroutine = False 1581 m_sock = m_socket.socket.return_value = mock.Mock() 1582 m_sock.bind.side_effect = Err 1583 1584 fut = self.loop.create_server(MyProto, '0.0.0.0', 0) 1585 self.assertRaises(OSError, self.loop.run_until_complete, fut) 1586 self.assertTrue(m_sock.close.called) 1587 1588 @patch_socket 1589 def test_create_datagram_endpoint_no_addrinfo(self, m_socket): 1590 m_socket.getaddrinfo.return_value = [] 1591 m_socket.getaddrinfo._is_coroutine = False 1592 1593 coro = self.loop.create_datagram_endpoint( 1594 MyDatagramProto, local_addr=('localhost', 0)) 1595 self.assertRaises( 1596 OSError, self.loop.run_until_complete, coro) 1597 1598 def test_create_datagram_endpoint_addr_error(self): 1599 coro = self.loop.create_datagram_endpoint( 1600 MyDatagramProto, local_addr='localhost') 1601 self.assertRaises( 1602 AssertionError, self.loop.run_until_complete, coro) 1603 coro = self.loop.create_datagram_endpoint( 1604 MyDatagramProto, local_addr=('localhost', 1, 2, 3)) 1605 self.assertRaises( 1606 AssertionError, self.loop.run_until_complete, coro) 1607 1608 def test_create_datagram_endpoint_connect_err(self): 1609 self.loop.sock_connect = mock.Mock() 1610 self.loop.sock_connect.side_effect = OSError 1611 1612 coro = self.loop.create_datagram_endpoint( 1613 asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0)) 1614 self.assertRaises( 1615 OSError, self.loop.run_until_complete, coro) 1616 1617 def test_create_datagram_endpoint_allow_broadcast(self): 1618 protocol = MyDatagramProto(create_future=True, loop=self.loop) 1619 self.loop.sock_connect = sock_connect = mock.Mock() 1620 sock_connect.return_value = [] 1621 1622 coro = self.loop.create_datagram_endpoint( 1623 lambda: protocol, 1624 remote_addr=('127.0.0.1', 0), 1625 allow_broadcast=True) 1626 1627 transport, _ = self.loop.run_until_complete(coro) 1628 self.assertFalse(sock_connect.called) 1629 1630 transport.close() 1631 self.loop.run_until_complete(protocol.done) 1632 self.assertEqual('CLOSED', protocol.state) 1633 1634 @patch_socket 1635 def test_create_datagram_endpoint_socket_err(self, m_socket): 1636 m_socket.getaddrinfo = socket.getaddrinfo 1637 m_socket.socket.side_effect = OSError 1638 1639 coro = self.loop.create_datagram_endpoint( 1640 asyncio.DatagramProtocol, family=socket.AF_INET) 1641 self.assertRaises( 1642 OSError, self.loop.run_until_complete, coro) 1643 1644 coro = self.loop.create_datagram_endpoint( 1645 asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0)) 1646 self.assertRaises( 1647 OSError, self.loop.run_until_complete, coro) 1648 1649 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled') 1650 def test_create_datagram_endpoint_no_matching_family(self): 1651 coro = self.loop.create_datagram_endpoint( 1652 asyncio.DatagramProtocol, 1653 remote_addr=('127.0.0.1', 0), local_addr=('::1', 0)) 1654 self.assertRaises( 1655 ValueError, self.loop.run_until_complete, coro) 1656 1657 @patch_socket 1658 def test_create_datagram_endpoint_setblk_err(self, m_socket): 1659 m_socket.socket.return_value.setblocking.side_effect = OSError 1660 1661 coro = self.loop.create_datagram_endpoint( 1662 asyncio.DatagramProtocol, family=socket.AF_INET) 1663 self.assertRaises( 1664 OSError, self.loop.run_until_complete, coro) 1665 self.assertTrue( 1666 m_socket.socket.return_value.close.called) 1667 1668 def test_create_datagram_endpoint_noaddr_nofamily(self): 1669 coro = self.loop.create_datagram_endpoint( 1670 asyncio.DatagramProtocol) 1671 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1672 1673 @patch_socket 1674 def test_create_datagram_endpoint_cant_bind(self, m_socket): 1675 class Err(OSError): 1676 pass 1677 1678 m_socket.getaddrinfo = socket.getaddrinfo 1679 m_sock = m_socket.socket.return_value = mock.Mock() 1680 m_sock.bind.side_effect = Err 1681 1682 fut = self.loop.create_datagram_endpoint( 1683 MyDatagramProto, 1684 local_addr=('127.0.0.1', 0), family=socket.AF_INET) 1685 self.assertRaises(Err, self.loop.run_until_complete, fut) 1686 self.assertTrue(m_sock.close.called) 1687 1688 def test_create_datagram_endpoint_sock(self): 1689 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1690 sock.bind(('127.0.0.1', 0)) 1691 fut = self.loop.create_datagram_endpoint( 1692 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1693 sock=sock) 1694 transport, protocol = self.loop.run_until_complete(fut) 1695 transport.close() 1696 self.loop.run_until_complete(protocol.done) 1697 self.assertEqual('CLOSED', protocol.state) 1698 1699 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 1700 def test_create_datagram_endpoint_sock_unix(self): 1701 fut = self.loop.create_datagram_endpoint( 1702 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1703 family=socket.AF_UNIX) 1704 transport, protocol = self.loop.run_until_complete(fut) 1705 self.assertEqual(transport._sock.family, socket.AF_UNIX) 1706 transport.close() 1707 self.loop.run_until_complete(protocol.done) 1708 self.assertEqual('CLOSED', protocol.state) 1709 1710 @socket_helper.skip_unless_bind_unix_socket 1711 def test_create_datagram_endpoint_existing_sock_unix(self): 1712 with test_utils.unix_socket_path() as path: 1713 sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM) 1714 sock.bind(path) 1715 sock.close() 1716 1717 coro = self.loop.create_datagram_endpoint( 1718 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1719 path, family=socket.AF_UNIX) 1720 transport, protocol = self.loop.run_until_complete(coro) 1721 transport.close() 1722 self.loop.run_until_complete(protocol.done) 1723 1724 def test_create_datagram_endpoint_sock_sockopts(self): 1725 class FakeSock: 1726 type = socket.SOCK_DGRAM 1727 1728 fut = self.loop.create_datagram_endpoint( 1729 MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock()) 1730 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1731 1732 fut = self.loop.create_datagram_endpoint( 1733 MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock()) 1734 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1735 1736 fut = self.loop.create_datagram_endpoint( 1737 MyDatagramProto, family=1, sock=FakeSock()) 1738 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1739 1740 fut = self.loop.create_datagram_endpoint( 1741 MyDatagramProto, proto=1, sock=FakeSock()) 1742 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1743 1744 fut = self.loop.create_datagram_endpoint( 1745 MyDatagramProto, flags=1, sock=FakeSock()) 1746 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1747 1748 fut = self.loop.create_datagram_endpoint( 1749 MyDatagramProto, reuse_port=True, sock=FakeSock()) 1750 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1751 1752 fut = self.loop.create_datagram_endpoint( 1753 MyDatagramProto, allow_broadcast=True, sock=FakeSock()) 1754 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1755 1756 @unittest.skipIf(sys.platform == 'vxworks', 1757 "SO_BROADCAST is enabled by default on VxWorks") 1758 def test_create_datagram_endpoint_sockopts(self): 1759 # Socket options should not be applied unless asked for. 1760 # SO_REUSEPORT is not available on all platforms. 1761 1762 coro = self.loop.create_datagram_endpoint( 1763 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1764 local_addr=('127.0.0.1', 0)) 1765 transport, protocol = self.loop.run_until_complete(coro) 1766 sock = transport.get_extra_info('socket') 1767 1768 reuseport_supported = hasattr(socket, 'SO_REUSEPORT') 1769 1770 if reuseport_supported: 1771 self.assertFalse( 1772 sock.getsockopt( 1773 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 1774 self.assertFalse( 1775 sock.getsockopt( 1776 socket.SOL_SOCKET, socket.SO_BROADCAST)) 1777 1778 transport.close() 1779 self.loop.run_until_complete(protocol.done) 1780 self.assertEqual('CLOSED', protocol.state) 1781 1782 coro = self.loop.create_datagram_endpoint( 1783 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1784 local_addr=('127.0.0.1', 0), 1785 reuse_port=reuseport_supported, 1786 allow_broadcast=True) 1787 transport, protocol = self.loop.run_until_complete(coro) 1788 sock = transport.get_extra_info('socket') 1789 1790 self.assertFalse( 1791 sock.getsockopt( 1792 socket.SOL_SOCKET, socket.SO_REUSEADDR)) 1793 if reuseport_supported: 1794 self.assertTrue( 1795 sock.getsockopt( 1796 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 1797 self.assertTrue( 1798 sock.getsockopt( 1799 socket.SOL_SOCKET, socket.SO_BROADCAST)) 1800 1801 transport.close() 1802 self.loop.run_until_complete(protocol.done) 1803 self.assertEqual('CLOSED', protocol.state) 1804 1805 def test_create_datagram_endpoint_reuse_address_error(self): 1806 # bpo-37228: Ensure that explicit passing of `reuse_address=True` 1807 # raises an error, as it is not safe to use SO_REUSEADDR when using UDP 1808 1809 coro = self.loop.create_datagram_endpoint( 1810 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1811 local_addr=('127.0.0.1', 0), 1812 reuse_address=True) 1813 1814 with self.assertRaises(ValueError): 1815 self.loop.run_until_complete(coro) 1816 1817 def test_create_datagram_endpoint_reuse_address_warning(self): 1818 # bpo-37228: Deprecate *reuse_address* parameter 1819 1820 coro = self.loop.create_datagram_endpoint( 1821 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1822 local_addr=('127.0.0.1', 0), 1823 reuse_address=False) 1824 1825 with self.assertWarns(DeprecationWarning): 1826 transport, protocol = self.loop.run_until_complete(coro) 1827 transport.close() 1828 self.loop.run_until_complete(protocol.done) 1829 self.assertEqual('CLOSED', protocol.state) 1830 1831 @patch_socket 1832 def test_create_datagram_endpoint_nosoreuseport(self, m_socket): 1833 del m_socket.SO_REUSEPORT 1834 m_socket.socket.return_value = mock.Mock() 1835 1836 coro = self.loop.create_datagram_endpoint( 1837 lambda: MyDatagramProto(loop=self.loop), 1838 local_addr=('127.0.0.1', 0), 1839 reuse_port=True) 1840 1841 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1842 1843 @patch_socket 1844 def test_create_datagram_endpoint_ip_addr(self, m_socket): 1845 def getaddrinfo(*args, **kw): 1846 self.fail('should not have called getaddrinfo') 1847 1848 m_socket.getaddrinfo = getaddrinfo 1849 m_socket.socket.return_value.bind = bind = mock.Mock() 1850 self.loop._add_reader = mock.Mock() 1851 self.loop._add_reader._is_coroutine = False 1852 1853 reuseport_supported = hasattr(socket, 'SO_REUSEPORT') 1854 coro = self.loop.create_datagram_endpoint( 1855 lambda: MyDatagramProto(loop=self.loop), 1856 local_addr=('1.2.3.4', 0), 1857 reuse_port=reuseport_supported) 1858 1859 t, p = self.loop.run_until_complete(coro) 1860 try: 1861 bind.assert_called_with(('1.2.3.4', 0)) 1862 m_socket.socket.assert_called_with(family=m_socket.AF_INET, 1863 proto=m_socket.IPPROTO_UDP, 1864 type=m_socket.SOCK_DGRAM) 1865 finally: 1866 t.close() 1867 test_utils.run_briefly(self.loop) # allow transport to close 1868 1869 def test_accept_connection_retry(self): 1870 sock = mock.Mock() 1871 sock.accept.side_effect = BlockingIOError() 1872 1873 self.loop._accept_connection(MyProto, sock) 1874 self.assertFalse(sock.close.called) 1875 1876 @mock.patch('asyncio.base_events.logger') 1877 def test_accept_connection_exception(self, m_log): 1878 sock = mock.Mock() 1879 sock.fileno.return_value = 10 1880 sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files') 1881 self.loop._remove_reader = mock.Mock() 1882 self.loop.call_later = mock.Mock() 1883 1884 self.loop._accept_connection(MyProto, sock) 1885 self.assertTrue(m_log.error.called) 1886 self.assertFalse(sock.close.called) 1887 self.loop._remove_reader.assert_called_with(10) 1888 self.loop.call_later.assert_called_with( 1889 constants.ACCEPT_RETRY_DELAY, 1890 # self.loop._start_serving 1891 mock.ANY, 1892 MyProto, sock, None, None, mock.ANY, mock.ANY) 1893 1894 def test_call_coroutine(self): 1895 with self.assertWarns(DeprecationWarning): 1896 @asyncio.coroutine 1897 def simple_coroutine(): 1898 pass 1899 1900 self.loop.set_debug(True) 1901 coro_func = simple_coroutine 1902 coro_obj = coro_func() 1903 self.addCleanup(coro_obj.close) 1904 for func in (coro_func, coro_obj): 1905 with self.assertRaises(TypeError): 1906 self.loop.call_soon(func) 1907 with self.assertRaises(TypeError): 1908 self.loop.call_soon_threadsafe(func) 1909 with self.assertRaises(TypeError): 1910 self.loop.call_later(60, func) 1911 with self.assertRaises(TypeError): 1912 self.loop.call_at(self.loop.time() + 60, func) 1913 with self.assertRaises(TypeError): 1914 self.loop.run_until_complete( 1915 self.loop.run_in_executor(None, func)) 1916 1917 @mock.patch('asyncio.base_events.logger') 1918 def test_log_slow_callbacks(self, m_logger): 1919 def stop_loop_cb(loop): 1920 loop.stop() 1921 1922 async def stop_loop_coro(loop): 1923 loop.stop() 1924 1925 asyncio.set_event_loop(self.loop) 1926 self.loop.set_debug(True) 1927 self.loop.slow_callback_duration = 0.0 1928 1929 # slow callback 1930 self.loop.call_soon(stop_loop_cb, self.loop) 1931 self.loop.run_forever() 1932 fmt, *args = m_logger.warning.call_args[0] 1933 self.assertRegex(fmt % tuple(args), 1934 "^Executing <Handle.*stop_loop_cb.*> " 1935 "took .* seconds$") 1936 1937 # slow task 1938 asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop) 1939 self.loop.run_forever() 1940 fmt, *args = m_logger.warning.call_args[0] 1941 self.assertRegex(fmt % tuple(args), 1942 "^Executing <Task.*stop_loop_coro.*> " 1943 "took .* seconds$") 1944 1945 1946class RunningLoopTests(unittest.TestCase): 1947 1948 def test_running_loop_within_a_loop(self): 1949 async def runner(loop): 1950 loop.run_forever() 1951 1952 loop = asyncio.new_event_loop() 1953 outer_loop = asyncio.new_event_loop() 1954 try: 1955 with self.assertRaisesRegex(RuntimeError, 1956 'while another loop is running'): 1957 outer_loop.run_until_complete(runner(loop)) 1958 finally: 1959 loop.close() 1960 outer_loop.close() 1961 1962 1963class BaseLoopSockSendfileTests(test_utils.TestCase): 1964 1965 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 1966 1967 class MyProto(asyncio.Protocol): 1968 1969 def __init__(self, loop): 1970 self.started = False 1971 self.closed = False 1972 self.data = bytearray() 1973 self.fut = loop.create_future() 1974 self.transport = None 1975 1976 def connection_made(self, transport): 1977 self.started = True 1978 self.transport = transport 1979 1980 def data_received(self, data): 1981 self.data.extend(data) 1982 1983 def connection_lost(self, exc): 1984 self.closed = True 1985 self.fut.set_result(None) 1986 self.transport = None 1987 1988 async def wait_closed(self): 1989 await self.fut 1990 1991 @classmethod 1992 def setUpClass(cls): 1993 cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE 1994 constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16 1995 with open(os_helper.TESTFN, 'wb') as fp: 1996 fp.write(cls.DATA) 1997 super().setUpClass() 1998 1999 @classmethod 2000 def tearDownClass(cls): 2001 constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize 2002 os_helper.unlink(os_helper.TESTFN) 2003 super().tearDownClass() 2004 2005 def setUp(self): 2006 from asyncio.selector_events import BaseSelectorEventLoop 2007 # BaseSelectorEventLoop() has no native implementation 2008 self.loop = BaseSelectorEventLoop() 2009 self.set_event_loop(self.loop) 2010 self.file = open(os_helper.TESTFN, 'rb') 2011 self.addCleanup(self.file.close) 2012 super().setUp() 2013 2014 def make_socket(self, blocking=False): 2015 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 2016 sock.setblocking(blocking) 2017 self.addCleanup(sock.close) 2018 return sock 2019 2020 def run_loop(self, coro): 2021 return self.loop.run_until_complete(coro) 2022 2023 def prepare(self): 2024 sock = self.make_socket() 2025 proto = self.MyProto(self.loop) 2026 server = self.run_loop(self.loop.create_server( 2027 lambda: proto, socket_helper.HOST, 0, family=socket.AF_INET)) 2028 addr = server.sockets[0].getsockname() 2029 2030 for _ in range(10): 2031 try: 2032 self.run_loop(self.loop.sock_connect(sock, addr)) 2033 except OSError: 2034 self.run_loop(asyncio.sleep(0.5)) 2035 continue 2036 else: 2037 break 2038 else: 2039 # One last try, so we get the exception 2040 self.run_loop(self.loop.sock_connect(sock, addr)) 2041 2042 def cleanup(): 2043 server.close() 2044 self.run_loop(server.wait_closed()) 2045 sock.close() 2046 if proto.transport is not None: 2047 proto.transport.close() 2048 self.run_loop(proto.wait_closed()) 2049 2050 self.addCleanup(cleanup) 2051 2052 return sock, proto 2053 2054 def test__sock_sendfile_native_failure(self): 2055 sock, proto = self.prepare() 2056 2057 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 2058 "sendfile is not available"): 2059 self.run_loop(self.loop._sock_sendfile_native(sock, self.file, 2060 0, None)) 2061 2062 self.assertEqual(proto.data, b'') 2063 self.assertEqual(self.file.tell(), 0) 2064 2065 def test_sock_sendfile_no_fallback(self): 2066 sock, proto = self.prepare() 2067 2068 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 2069 "sendfile is not available"): 2070 self.run_loop(self.loop.sock_sendfile(sock, self.file, 2071 fallback=False)) 2072 2073 self.assertEqual(self.file.tell(), 0) 2074 self.assertEqual(proto.data, b'') 2075 2076 def test_sock_sendfile_fallback(self): 2077 sock, proto = self.prepare() 2078 2079 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2080 sock.close() 2081 self.run_loop(proto.wait_closed()) 2082 2083 self.assertEqual(ret, len(self.DATA)) 2084 self.assertEqual(self.file.tell(), len(self.DATA)) 2085 self.assertEqual(proto.data, self.DATA) 2086 2087 def test_sock_sendfile_fallback_offset_and_count(self): 2088 sock, proto = self.prepare() 2089 2090 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file, 2091 1000, 2000)) 2092 sock.close() 2093 self.run_loop(proto.wait_closed()) 2094 2095 self.assertEqual(ret, 2000) 2096 self.assertEqual(self.file.tell(), 3000) 2097 self.assertEqual(proto.data, self.DATA[1000:3000]) 2098 2099 def test_blocking_socket(self): 2100 self.loop.set_debug(True) 2101 sock = self.make_socket(blocking=True) 2102 with self.assertRaisesRegex(ValueError, "must be non-blocking"): 2103 self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2104 2105 def test_nonbinary_file(self): 2106 sock = self.make_socket() 2107 with open(os_helper.TESTFN, encoding="utf-8") as f: 2108 with self.assertRaisesRegex(ValueError, "binary mode"): 2109 self.run_loop(self.loop.sock_sendfile(sock, f)) 2110 2111 def test_nonstream_socket(self): 2112 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 2113 sock.setblocking(False) 2114 self.addCleanup(sock.close) 2115 with self.assertRaisesRegex(ValueError, "only SOCK_STREAM type"): 2116 self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2117 2118 def test_notint_count(self): 2119 sock = self.make_socket() 2120 with self.assertRaisesRegex(TypeError, 2121 "count must be a positive integer"): 2122 self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, 'count')) 2123 2124 def test_negative_count(self): 2125 sock = self.make_socket() 2126 with self.assertRaisesRegex(ValueError, 2127 "count must be a positive integer"): 2128 self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, -1)) 2129 2130 def test_notint_offset(self): 2131 sock = self.make_socket() 2132 with self.assertRaisesRegex(TypeError, 2133 "offset must be a non-negative integer"): 2134 self.run_loop(self.loop.sock_sendfile(sock, self.file, 'offset')) 2135 2136 def test_negative_offset(self): 2137 sock = self.make_socket() 2138 with self.assertRaisesRegex(ValueError, 2139 "offset must be a non-negative integer"): 2140 self.run_loop(self.loop.sock_sendfile(sock, self.file, -1)) 2141 2142 2143class TestSelectorUtils(test_utils.TestCase): 2144 def check_set_nodelay(self, sock): 2145 opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) 2146 self.assertFalse(opt) 2147 2148 base_events._set_nodelay(sock) 2149 2150 opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) 2151 self.assertTrue(opt) 2152 2153 @unittest.skipUnless(hasattr(socket, 'TCP_NODELAY'), 2154 'need socket.TCP_NODELAY') 2155 def test_set_nodelay(self): 2156 sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, 2157 proto=socket.IPPROTO_TCP) 2158 with sock: 2159 self.check_set_nodelay(sock) 2160 2161 sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, 2162 proto=socket.IPPROTO_TCP) 2163 with sock: 2164 sock.setblocking(False) 2165 self.check_set_nodelay(sock) 2166 2167 2168 2169if __name__ == '__main__': 2170 unittest.main() 2171