1"""Tests for base_events.py""" 2 3import concurrent.futures 4import errno 5import math 6import os 7import socket 8import sys 9import threading 10import time 11import unittest 12from unittest import mock 13 14import asyncio 15from asyncio import base_events 16from asyncio import constants 17from test.test_asyncio import utils as test_utils 18from test import support 19from test.support.script_helper import assert_python_ok 20 21 22MOCK_ANY = mock.ANY 23PY34 = sys.version_info >= (3, 4) 24 25 26def tearDownModule(): 27 asyncio.set_event_loop_policy(None) 28 29 30def mock_socket_module(): 31 m_socket = mock.MagicMock(spec=socket) 32 for name in ( 33 'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP', 34 'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton' 35 ): 36 if hasattr(socket, name): 37 setattr(m_socket, name, getattr(socket, name)) 38 else: 39 delattr(m_socket, name) 40 41 m_socket.socket = mock.MagicMock() 42 m_socket.socket.return_value = test_utils.mock_nonblocking_socket() 43 m_socket.getaddrinfo._is_coroutine = False 44 45 return m_socket 46 47 48def patch_socket(f): 49 return mock.patch('asyncio.base_events.socket', 50 new_callable=mock_socket_module)(f) 51 52 53class BaseEventTests(test_utils.TestCase): 54 55 def test_ipaddr_info(self): 56 UNSPEC = socket.AF_UNSPEC 57 INET = socket.AF_INET 58 INET6 = socket.AF_INET6 59 STREAM = socket.SOCK_STREAM 60 DGRAM = socket.SOCK_DGRAM 61 TCP = socket.IPPROTO_TCP 62 UDP = socket.IPPROTO_UDP 63 64 self.assertEqual( 65 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 66 base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP)) 67 68 self.assertEqual( 69 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 70 base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP)) 71 72 self.assertEqual( 73 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 74 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP)) 75 76 self.assertEqual( 77 (INET, DGRAM, UDP, '', ('1.2.3.4', 1)), 78 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP)) 79 80 # Socket type STREAM implies TCP protocol. 81 self.assertEqual( 82 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 83 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0)) 84 85 # Socket type DGRAM implies UDP protocol. 86 self.assertEqual( 87 (INET, DGRAM, UDP, '', ('1.2.3.4', 1)), 88 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0)) 89 90 # No socket type. 91 self.assertIsNone( 92 base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0)) 93 94 if support.IPV6_ENABLED: 95 # IPv4 address with family IPv6. 96 self.assertIsNone( 97 base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP)) 98 99 self.assertEqual( 100 (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)), 101 base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP)) 102 103 self.assertEqual( 104 (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)), 105 base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP)) 106 107 # IPv6 address with family IPv4. 108 self.assertIsNone( 109 base_events._ipaddr_info('::3', 1, INET, STREAM, TCP)) 110 111 # IPv6 address with zone index. 112 self.assertIsNone( 113 base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP)) 114 115 def test_port_parameter_types(self): 116 # Test obscure kinds of arguments for "port". 117 INET = socket.AF_INET 118 STREAM = socket.SOCK_STREAM 119 TCP = socket.IPPROTO_TCP 120 121 self.assertEqual( 122 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 123 base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP)) 124 125 self.assertEqual( 126 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 127 base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP)) 128 129 self.assertEqual( 130 (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 131 base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP)) 132 133 self.assertEqual( 134 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 135 base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP)) 136 137 self.assertEqual( 138 (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 139 base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP)) 140 141 @patch_socket 142 def test_ipaddr_info_no_inet_pton(self, m_socket): 143 del m_socket.inet_pton 144 self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1, 145 socket.AF_INET, 146 socket.SOCK_STREAM, 147 socket.IPPROTO_TCP)) 148 149 150class BaseEventLoopTests(test_utils.TestCase): 151 152 def setUp(self): 153 super().setUp() 154 self.loop = base_events.BaseEventLoop() 155 self.loop._selector = mock.Mock() 156 self.loop._selector.select.return_value = () 157 self.set_event_loop(self.loop) 158 159 def test_not_implemented(self): 160 m = mock.Mock() 161 self.assertRaises( 162 NotImplementedError, 163 self.loop._make_socket_transport, m, m) 164 self.assertRaises( 165 NotImplementedError, 166 self.loop._make_ssl_transport, m, m, m, m) 167 self.assertRaises( 168 NotImplementedError, 169 self.loop._make_datagram_transport, m, m) 170 self.assertRaises( 171 NotImplementedError, self.loop._process_events, []) 172 self.assertRaises( 173 NotImplementedError, self.loop._write_to_self) 174 self.assertRaises( 175 NotImplementedError, 176 self.loop._make_read_pipe_transport, m, m) 177 self.assertRaises( 178 NotImplementedError, 179 self.loop._make_write_pipe_transport, m, m) 180 gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m) 181 with self.assertRaises(NotImplementedError): 182 gen.send(None) 183 184 def test_close(self): 185 self.assertFalse(self.loop.is_closed()) 186 self.loop.close() 187 self.assertTrue(self.loop.is_closed()) 188 189 # it should be possible to call close() more than once 190 self.loop.close() 191 self.loop.close() 192 193 # operation blocked when the loop is closed 194 f = self.loop.create_future() 195 self.assertRaises(RuntimeError, self.loop.run_forever) 196 self.assertRaises(RuntimeError, self.loop.run_until_complete, f) 197 198 def test__add_callback_handle(self): 199 h = asyncio.Handle(lambda: False, (), self.loop, None) 200 201 self.loop._add_callback(h) 202 self.assertFalse(self.loop._scheduled) 203 self.assertIn(h, self.loop._ready) 204 205 def test__add_callback_cancelled_handle(self): 206 h = asyncio.Handle(lambda: False, (), self.loop, None) 207 h.cancel() 208 209 self.loop._add_callback(h) 210 self.assertFalse(self.loop._scheduled) 211 self.assertFalse(self.loop._ready) 212 213 def test_set_default_executor(self): 214 class DummyExecutor(concurrent.futures.ThreadPoolExecutor): 215 def submit(self, fn, *args, **kwargs): 216 raise NotImplementedError( 217 'cannot submit into a dummy executor') 218 219 executor = DummyExecutor() 220 self.loop.set_default_executor(executor) 221 self.assertIs(executor, self.loop._default_executor) 222 223 def test_set_default_executor_deprecation_warnings(self): 224 executor = mock.Mock() 225 226 with self.assertWarns(DeprecationWarning): 227 self.loop.set_default_executor(executor) 228 229 def test_call_soon(self): 230 def cb(): 231 pass 232 233 h = self.loop.call_soon(cb) 234 self.assertEqual(h._callback, cb) 235 self.assertIsInstance(h, asyncio.Handle) 236 self.assertIn(h, self.loop._ready) 237 238 def test_call_soon_non_callable(self): 239 self.loop.set_debug(True) 240 with self.assertRaisesRegex(TypeError, 'a callable object'): 241 self.loop.call_soon(1) 242 243 def test_call_later(self): 244 def cb(): 245 pass 246 247 h = self.loop.call_later(10.0, cb) 248 self.assertIsInstance(h, asyncio.TimerHandle) 249 self.assertIn(h, self.loop._scheduled) 250 self.assertNotIn(h, self.loop._ready) 251 252 def test_call_later_negative_delays(self): 253 calls = [] 254 255 def cb(arg): 256 calls.append(arg) 257 258 self.loop._process_events = mock.Mock() 259 self.loop.call_later(-1, cb, 'a') 260 self.loop.call_later(-2, cb, 'b') 261 test_utils.run_briefly(self.loop) 262 self.assertEqual(calls, ['b', 'a']) 263 264 def test_time_and_call_at(self): 265 def cb(): 266 self.loop.stop() 267 268 self.loop._process_events = mock.Mock() 269 delay = 0.1 270 271 when = self.loop.time() + delay 272 self.loop.call_at(when, cb) 273 t0 = self.loop.time() 274 self.loop.run_forever() 275 dt = self.loop.time() - t0 276 277 # 50 ms: maximum granularity of the event loop 278 self.assertGreaterEqual(dt, delay - 0.050, dt) 279 # tolerate a difference of +800 ms because some Python buildbots 280 # are really slow 281 self.assertLessEqual(dt, 0.9, dt) 282 283 def check_thread(self, loop, debug): 284 def cb(): 285 pass 286 287 loop.set_debug(debug) 288 if debug: 289 msg = ("Non-thread-safe operation invoked on an event loop other " 290 "than the current one") 291 with self.assertRaisesRegex(RuntimeError, msg): 292 loop.call_soon(cb) 293 with self.assertRaisesRegex(RuntimeError, msg): 294 loop.call_later(60, cb) 295 with self.assertRaisesRegex(RuntimeError, msg): 296 loop.call_at(loop.time() + 60, cb) 297 else: 298 loop.call_soon(cb) 299 loop.call_later(60, cb) 300 loop.call_at(loop.time() + 60, cb) 301 302 def test_check_thread(self): 303 def check_in_thread(loop, event, debug, create_loop, fut): 304 # wait until the event loop is running 305 event.wait() 306 307 try: 308 if create_loop: 309 loop2 = base_events.BaseEventLoop() 310 try: 311 asyncio.set_event_loop(loop2) 312 self.check_thread(loop, debug) 313 finally: 314 asyncio.set_event_loop(None) 315 loop2.close() 316 else: 317 self.check_thread(loop, debug) 318 except Exception as exc: 319 loop.call_soon_threadsafe(fut.set_exception, exc) 320 else: 321 loop.call_soon_threadsafe(fut.set_result, None) 322 323 def test_thread(loop, debug, create_loop=False): 324 event = threading.Event() 325 fut = loop.create_future() 326 loop.call_soon(event.set) 327 args = (loop, event, debug, create_loop, fut) 328 thread = threading.Thread(target=check_in_thread, args=args) 329 thread.start() 330 loop.run_until_complete(fut) 331 thread.join() 332 333 self.loop._process_events = mock.Mock() 334 self.loop._write_to_self = mock.Mock() 335 336 # raise RuntimeError if the thread has no event loop 337 test_thread(self.loop, True) 338 339 # check disabled if debug mode is disabled 340 test_thread(self.loop, False) 341 342 # raise RuntimeError if the event loop of the thread is not the called 343 # event loop 344 test_thread(self.loop, True, create_loop=True) 345 346 # check disabled if debug mode is disabled 347 test_thread(self.loop, False, create_loop=True) 348 349 def test__run_once(self): 350 h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (), 351 self.loop, None) 352 h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (), 353 self.loop, None) 354 355 h1.cancel() 356 357 self.loop._process_events = mock.Mock() 358 self.loop._scheduled.append(h1) 359 self.loop._scheduled.append(h2) 360 self.loop._run_once() 361 362 t = self.loop._selector.select.call_args[0][0] 363 self.assertTrue(9.5 < t < 10.5, t) 364 self.assertEqual([h2], self.loop._scheduled) 365 self.assertTrue(self.loop._process_events.called) 366 367 def test_set_debug(self): 368 self.loop.set_debug(True) 369 self.assertTrue(self.loop.get_debug()) 370 self.loop.set_debug(False) 371 self.assertFalse(self.loop.get_debug()) 372 373 def test__run_once_schedule_handle(self): 374 handle = None 375 processed = False 376 377 def cb(loop): 378 nonlocal processed, handle 379 processed = True 380 handle = loop.call_soon(lambda: True) 381 382 h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,), 383 self.loop, None) 384 385 self.loop._process_events = mock.Mock() 386 self.loop._scheduled.append(h) 387 self.loop._run_once() 388 389 self.assertTrue(processed) 390 self.assertEqual([handle], list(self.loop._ready)) 391 392 def test__run_once_cancelled_event_cleanup(self): 393 self.loop._process_events = mock.Mock() 394 395 self.assertTrue( 396 0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0) 397 398 def cb(): 399 pass 400 401 # Set up one "blocking" event that will not be cancelled to 402 # ensure later cancelled events do not make it to the head 403 # of the queue and get cleaned. 404 not_cancelled_count = 1 405 self.loop.call_later(3000, cb) 406 407 # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES) 408 # cancelled handles, ensure they aren't removed 409 410 cancelled_count = 2 411 for x in range(2): 412 h = self.loop.call_later(3600, cb) 413 h.cancel() 414 415 # Add some cancelled events that will be at head and removed 416 cancelled_count += 2 417 for x in range(2): 418 h = self.loop.call_later(100, cb) 419 h.cancel() 420 421 # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low 422 self.assertLessEqual(cancelled_count + not_cancelled_count, 423 base_events._MIN_SCHEDULED_TIMER_HANDLES) 424 425 self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) 426 427 self.loop._run_once() 428 429 cancelled_count -= 2 430 431 self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) 432 433 self.assertEqual(len(self.loop._scheduled), 434 cancelled_count + not_cancelled_count) 435 436 # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION 437 # so that deletion of cancelled events will occur on next _run_once 438 add_cancel_count = int(math.ceil( 439 base_events._MIN_SCHEDULED_TIMER_HANDLES * 440 base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1 441 442 add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES - 443 add_cancel_count, 0) 444 445 # Add some events that will not be cancelled 446 not_cancelled_count += add_not_cancel_count 447 for x in range(add_not_cancel_count): 448 self.loop.call_later(3600, cb) 449 450 # Add enough cancelled events 451 cancelled_count += add_cancel_count 452 for x in range(add_cancel_count): 453 h = self.loop.call_later(3600, cb) 454 h.cancel() 455 456 # Ensure all handles are still scheduled 457 self.assertEqual(len(self.loop._scheduled), 458 cancelled_count + not_cancelled_count) 459 460 self.loop._run_once() 461 462 # Ensure cancelled events were removed 463 self.assertEqual(len(self.loop._scheduled), not_cancelled_count) 464 465 # Ensure only uncancelled events remain scheduled 466 self.assertTrue(all([not x._cancelled for x in self.loop._scheduled])) 467 468 def test_run_until_complete_type_error(self): 469 self.assertRaises(TypeError, 470 self.loop.run_until_complete, 'blah') 471 472 def test_run_until_complete_loop(self): 473 task = self.loop.create_future() 474 other_loop = self.new_test_loop() 475 self.addCleanup(other_loop.close) 476 self.assertRaises(ValueError, 477 other_loop.run_until_complete, task) 478 479 def test_run_until_complete_loop_orphan_future_close_loop(self): 480 class ShowStopper(SystemExit): 481 pass 482 483 async def foo(delay): 484 await asyncio.sleep(delay) 485 486 def throw(): 487 raise ShowStopper 488 489 self.loop._process_events = mock.Mock() 490 self.loop.call_soon(throw) 491 with self.assertRaises(ShowStopper): 492 self.loop.run_until_complete(foo(0.1)) 493 494 # This call fails if run_until_complete does not clean up 495 # done-callback for the previous future. 496 self.loop.run_until_complete(foo(0.2)) 497 498 def test_subprocess_exec_invalid_args(self): 499 args = [sys.executable, '-c', 'pass'] 500 501 # missing program parameter (empty args) 502 self.assertRaises(TypeError, 503 self.loop.run_until_complete, self.loop.subprocess_exec, 504 asyncio.SubprocessProtocol) 505 506 # expected multiple arguments, not a list 507 self.assertRaises(TypeError, 508 self.loop.run_until_complete, self.loop.subprocess_exec, 509 asyncio.SubprocessProtocol, args) 510 511 # program arguments must be strings, not int 512 self.assertRaises(TypeError, 513 self.loop.run_until_complete, self.loop.subprocess_exec, 514 asyncio.SubprocessProtocol, sys.executable, 123) 515 516 # universal_newlines, shell, bufsize must not be set 517 self.assertRaises(TypeError, 518 self.loop.run_until_complete, self.loop.subprocess_exec, 519 asyncio.SubprocessProtocol, *args, universal_newlines=True) 520 self.assertRaises(TypeError, 521 self.loop.run_until_complete, self.loop.subprocess_exec, 522 asyncio.SubprocessProtocol, *args, shell=True) 523 self.assertRaises(TypeError, 524 self.loop.run_until_complete, self.loop.subprocess_exec, 525 asyncio.SubprocessProtocol, *args, bufsize=4096) 526 527 def test_subprocess_shell_invalid_args(self): 528 # expected a string, not an int or a list 529 self.assertRaises(TypeError, 530 self.loop.run_until_complete, self.loop.subprocess_shell, 531 asyncio.SubprocessProtocol, 123) 532 self.assertRaises(TypeError, 533 self.loop.run_until_complete, self.loop.subprocess_shell, 534 asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass']) 535 536 # universal_newlines, shell, bufsize must not be set 537 self.assertRaises(TypeError, 538 self.loop.run_until_complete, self.loop.subprocess_shell, 539 asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True) 540 self.assertRaises(TypeError, 541 self.loop.run_until_complete, self.loop.subprocess_shell, 542 asyncio.SubprocessProtocol, 'exit 0', shell=True) 543 self.assertRaises(TypeError, 544 self.loop.run_until_complete, self.loop.subprocess_shell, 545 asyncio.SubprocessProtocol, 'exit 0', bufsize=4096) 546 547 def test_default_exc_handler_callback(self): 548 self.loop._process_events = mock.Mock() 549 550 def zero_error(fut): 551 fut.set_result(True) 552 1/0 553 554 # Test call_soon (events.Handle) 555 with mock.patch('asyncio.base_events.logger') as log: 556 fut = self.loop.create_future() 557 self.loop.call_soon(zero_error, fut) 558 fut.add_done_callback(lambda fut: self.loop.stop()) 559 self.loop.run_forever() 560 log.error.assert_called_with( 561 test_utils.MockPattern('Exception in callback.*zero'), 562 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 563 564 # Test call_later (events.TimerHandle) 565 with mock.patch('asyncio.base_events.logger') as log: 566 fut = self.loop.create_future() 567 self.loop.call_later(0.01, zero_error, fut) 568 fut.add_done_callback(lambda fut: self.loop.stop()) 569 self.loop.run_forever() 570 log.error.assert_called_with( 571 test_utils.MockPattern('Exception in callback.*zero'), 572 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 573 574 def test_default_exc_handler_coro(self): 575 self.loop._process_events = mock.Mock() 576 577 async def zero_error_coro(): 578 await asyncio.sleep(0.01) 579 1/0 580 581 # Test Future.__del__ 582 with mock.patch('asyncio.base_events.logger') as log: 583 fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop) 584 fut.add_done_callback(lambda *args: self.loop.stop()) 585 self.loop.run_forever() 586 fut = None # Trigger Future.__del__ or futures._TracebackLogger 587 support.gc_collect() 588 if PY34: 589 # Future.__del__ in Python 3.4 logs error with 590 # an actual exception context 591 log.error.assert_called_with( 592 test_utils.MockPattern('.*exception was never retrieved'), 593 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 594 else: 595 # futures._TracebackLogger logs only textual traceback 596 log.error.assert_called_with( 597 test_utils.MockPattern( 598 '.*exception was never retrieved.*ZeroDiv'), 599 exc_info=False) 600 601 def test_set_exc_handler_invalid(self): 602 with self.assertRaisesRegex(TypeError, 'A callable object or None'): 603 self.loop.set_exception_handler('spam') 604 605 def test_set_exc_handler_custom(self): 606 def zero_error(): 607 1/0 608 609 def run_loop(): 610 handle = self.loop.call_soon(zero_error) 611 self.loop._run_once() 612 return handle 613 614 self.loop.set_debug(True) 615 self.loop._process_events = mock.Mock() 616 617 self.assertIsNone(self.loop.get_exception_handler()) 618 mock_handler = mock.Mock() 619 self.loop.set_exception_handler(mock_handler) 620 self.assertIs(self.loop.get_exception_handler(), mock_handler) 621 handle = run_loop() 622 mock_handler.assert_called_with(self.loop, { 623 'exception': MOCK_ANY, 624 'message': test_utils.MockPattern( 625 'Exception in callback.*zero_error'), 626 'handle': handle, 627 'source_traceback': handle._source_traceback, 628 }) 629 mock_handler.reset_mock() 630 631 self.loop.set_exception_handler(None) 632 with mock.patch('asyncio.base_events.logger') as log: 633 run_loop() 634 log.error.assert_called_with( 635 test_utils.MockPattern( 636 'Exception in callback.*zero'), 637 exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 638 639 assert not mock_handler.called 640 641 def test_set_exc_handler_broken(self): 642 def run_loop(): 643 def zero_error(): 644 1/0 645 self.loop.call_soon(zero_error) 646 self.loop._run_once() 647 648 def handler(loop, context): 649 raise AttributeError('spam') 650 651 self.loop._process_events = mock.Mock() 652 653 self.loop.set_exception_handler(handler) 654 655 with mock.patch('asyncio.base_events.logger') as log: 656 run_loop() 657 log.error.assert_called_with( 658 test_utils.MockPattern( 659 'Unhandled error in exception handler'), 660 exc_info=(AttributeError, MOCK_ANY, MOCK_ANY)) 661 662 def test_default_exc_handler_broken(self): 663 _context = None 664 665 class Loop(base_events.BaseEventLoop): 666 667 _selector = mock.Mock() 668 _process_events = mock.Mock() 669 670 def default_exception_handler(self, context): 671 nonlocal _context 672 _context = context 673 # Simulates custom buggy "default_exception_handler" 674 raise ValueError('spam') 675 676 loop = Loop() 677 self.addCleanup(loop.close) 678 asyncio.set_event_loop(loop) 679 680 def run_loop(): 681 def zero_error(): 682 1/0 683 loop.call_soon(zero_error) 684 loop._run_once() 685 686 with mock.patch('asyncio.base_events.logger') as log: 687 run_loop() 688 log.error.assert_called_with( 689 'Exception in default exception handler', 690 exc_info=True) 691 692 def custom_handler(loop, context): 693 raise ValueError('ham') 694 695 _context = None 696 loop.set_exception_handler(custom_handler) 697 with mock.patch('asyncio.base_events.logger') as log: 698 run_loop() 699 log.error.assert_called_with( 700 test_utils.MockPattern('Exception in default exception.*' 701 'while handling.*in custom'), 702 exc_info=True) 703 704 # Check that original context was passed to default 705 # exception handler. 706 self.assertIn('context', _context) 707 self.assertIs(type(_context['context']['exception']), 708 ZeroDivisionError) 709 710 def test_set_task_factory_invalid(self): 711 with self.assertRaisesRegex( 712 TypeError, 'task factory must be a callable or None'): 713 714 self.loop.set_task_factory(1) 715 716 self.assertIsNone(self.loop.get_task_factory()) 717 718 def test_set_task_factory(self): 719 self.loop._process_events = mock.Mock() 720 721 class MyTask(asyncio.Task): 722 pass 723 724 async def coro(): 725 pass 726 727 factory = lambda loop, coro: MyTask(coro, loop=loop) 728 729 self.assertIsNone(self.loop.get_task_factory()) 730 self.loop.set_task_factory(factory) 731 self.assertIs(self.loop.get_task_factory(), factory) 732 733 task = self.loop.create_task(coro()) 734 self.assertTrue(isinstance(task, MyTask)) 735 self.loop.run_until_complete(task) 736 737 self.loop.set_task_factory(None) 738 self.assertIsNone(self.loop.get_task_factory()) 739 740 task = self.loop.create_task(coro()) 741 self.assertTrue(isinstance(task, asyncio.Task)) 742 self.assertFalse(isinstance(task, MyTask)) 743 self.loop.run_until_complete(task) 744 745 def test_env_var_debug(self): 746 code = '\n'.join(( 747 'import asyncio', 748 'loop = asyncio.get_event_loop()', 749 'print(loop.get_debug())')) 750 751 # Test with -E to not fail if the unit test was run with 752 # PYTHONASYNCIODEBUG set to a non-empty string 753 sts, stdout, stderr = assert_python_ok('-E', '-c', code) 754 self.assertEqual(stdout.rstrip(), b'False') 755 756 sts, stdout, stderr = assert_python_ok('-c', code, 757 PYTHONASYNCIODEBUG='', 758 PYTHONDEVMODE='') 759 self.assertEqual(stdout.rstrip(), b'False') 760 761 sts, stdout, stderr = assert_python_ok('-c', code, 762 PYTHONASYNCIODEBUG='1', 763 PYTHONDEVMODE='') 764 self.assertEqual(stdout.rstrip(), b'True') 765 766 sts, stdout, stderr = assert_python_ok('-E', '-c', code, 767 PYTHONASYNCIODEBUG='1') 768 self.assertEqual(stdout.rstrip(), b'False') 769 770 # -X dev 771 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 772 '-c', code) 773 self.assertEqual(stdout.rstrip(), b'True') 774 775 def test_create_task(self): 776 class MyTask(asyncio.Task): 777 pass 778 779 async def test(): 780 pass 781 782 class EventLoop(base_events.BaseEventLoop): 783 def create_task(self, coro): 784 return MyTask(coro, loop=loop) 785 786 loop = EventLoop() 787 self.set_event_loop(loop) 788 789 coro = test() 790 task = asyncio.ensure_future(coro, loop=loop) 791 self.assertIsInstance(task, MyTask) 792 793 # make warnings quiet 794 task._log_destroy_pending = False 795 coro.close() 796 797 def test_create_named_task_with_default_factory(self): 798 async def test(): 799 pass 800 801 loop = asyncio.new_event_loop() 802 task = loop.create_task(test(), name='test_task') 803 try: 804 self.assertEqual(task.get_name(), 'test_task') 805 finally: 806 loop.run_until_complete(task) 807 loop.close() 808 809 def test_create_named_task_with_custom_factory(self): 810 def task_factory(loop, coro): 811 return asyncio.Task(coro, loop=loop) 812 813 async def test(): 814 pass 815 816 loop = asyncio.new_event_loop() 817 loop.set_task_factory(task_factory) 818 task = loop.create_task(test(), name='test_task') 819 try: 820 self.assertEqual(task.get_name(), 'test_task') 821 finally: 822 loop.run_until_complete(task) 823 loop.close() 824 825 def test_run_forever_keyboard_interrupt(self): 826 # Python issue #22601: ensure that the temporary task created by 827 # run_forever() consumes the KeyboardInterrupt and so don't log 828 # a warning 829 async def raise_keyboard_interrupt(): 830 raise KeyboardInterrupt 831 832 self.loop._process_events = mock.Mock() 833 self.loop.call_exception_handler = mock.Mock() 834 835 try: 836 self.loop.run_until_complete(raise_keyboard_interrupt()) 837 except KeyboardInterrupt: 838 pass 839 self.loop.close() 840 support.gc_collect() 841 842 self.assertFalse(self.loop.call_exception_handler.called) 843 844 def test_run_until_complete_baseexception(self): 845 # Python issue #22429: run_until_complete() must not schedule a pending 846 # call to stop() if the future raised a BaseException 847 async def raise_keyboard_interrupt(): 848 raise KeyboardInterrupt 849 850 self.loop._process_events = mock.Mock() 851 852 try: 853 self.loop.run_until_complete(raise_keyboard_interrupt()) 854 except KeyboardInterrupt: 855 pass 856 857 def func(): 858 self.loop.stop() 859 func.called = True 860 func.called = False 861 try: 862 self.loop.call_soon(func) 863 self.loop.run_forever() 864 except KeyboardInterrupt: 865 pass 866 self.assertTrue(func.called) 867 868 def test_single_selecter_event_callback_after_stopping(self): 869 # Python issue #25593: A stopped event loop may cause event callbacks 870 # to run more than once. 871 event_sentinel = object() 872 callcount = 0 873 doer = None 874 875 def proc_events(event_list): 876 nonlocal doer 877 if event_sentinel in event_list: 878 doer = self.loop.call_soon(do_event) 879 880 def do_event(): 881 nonlocal callcount 882 callcount += 1 883 self.loop.call_soon(clear_selector) 884 885 def clear_selector(): 886 doer.cancel() 887 self.loop._selector.select.return_value = () 888 889 self.loop._process_events = proc_events 890 self.loop._selector.select.return_value = (event_sentinel,) 891 892 for i in range(1, 3): 893 with self.subTest('Loop %d/2' % i): 894 self.loop.call_soon(self.loop.stop) 895 self.loop.run_forever() 896 self.assertEqual(callcount, 1) 897 898 def test_run_once(self): 899 # Simple test for test_utils.run_once(). It may seem strange 900 # to have a test for this (the function isn't even used!) but 901 # it's a de-factor standard API for library tests. This tests 902 # the idiom: loop.call_soon(loop.stop); loop.run_forever(). 903 count = 0 904 905 def callback(): 906 nonlocal count 907 count += 1 908 909 self.loop._process_events = mock.Mock() 910 self.loop.call_soon(callback) 911 test_utils.run_once(self.loop) 912 self.assertEqual(count, 1) 913 914 def test_run_forever_pre_stopped(self): 915 # Test that the old idiom for pre-stopping the loop works. 916 self.loop._process_events = mock.Mock() 917 self.loop.stop() 918 self.loop.run_forever() 919 self.loop._selector.select.assert_called_once_with(0) 920 921 async def leave_unfinalized_asyncgen(self): 922 # Create an async generator, iterate it partially, and leave it 923 # to be garbage collected. 924 # Used in async generator finalization tests. 925 # Depends on implementation details of garbage collector. Changes 926 # in gc may break this function. 927 status = {'started': False, 928 'stopped': False, 929 'finalized': False} 930 931 async def agen(): 932 status['started'] = True 933 try: 934 for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']: 935 yield item 936 finally: 937 status['finalized'] = True 938 939 ag = agen() 940 ai = ag.__aiter__() 941 942 async def iter_one(): 943 try: 944 item = await ai.__anext__() 945 except StopAsyncIteration: 946 return 947 if item == 'THREE': 948 status['stopped'] = True 949 return 950 asyncio.create_task(iter_one()) 951 952 asyncio.create_task(iter_one()) 953 return status 954 955 def test_asyncgen_finalization_by_gc(self): 956 # Async generators should be finalized when garbage collected. 957 self.loop._process_events = mock.Mock() 958 self.loop._write_to_self = mock.Mock() 959 with support.disable_gc(): 960 status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) 961 while not status['stopped']: 962 test_utils.run_briefly(self.loop) 963 self.assertTrue(status['started']) 964 self.assertTrue(status['stopped']) 965 self.assertFalse(status['finalized']) 966 support.gc_collect() 967 test_utils.run_briefly(self.loop) 968 self.assertTrue(status['finalized']) 969 970 def test_asyncgen_finalization_by_gc_in_other_thread(self): 971 # Python issue 34769: If garbage collector runs in another 972 # thread, async generators will not finalize in debug 973 # mode. 974 self.loop._process_events = mock.Mock() 975 self.loop._write_to_self = mock.Mock() 976 self.loop.set_debug(True) 977 with support.disable_gc(): 978 status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) 979 while not status['stopped']: 980 test_utils.run_briefly(self.loop) 981 self.assertTrue(status['started']) 982 self.assertTrue(status['stopped']) 983 self.assertFalse(status['finalized']) 984 self.loop.run_until_complete( 985 self.loop.run_in_executor(None, support.gc_collect)) 986 test_utils.run_briefly(self.loop) 987 self.assertTrue(status['finalized']) 988 989 990class MyProto(asyncio.Protocol): 991 done = None 992 993 def __init__(self, create_future=False): 994 self.state = 'INITIAL' 995 self.nbytes = 0 996 if create_future: 997 self.done = asyncio.get_running_loop().create_future() 998 999 def connection_made(self, transport): 1000 self.transport = transport 1001 assert self.state == 'INITIAL', self.state 1002 self.state = 'CONNECTED' 1003 transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n') 1004 1005 def data_received(self, data): 1006 assert self.state == 'CONNECTED', self.state 1007 self.nbytes += len(data) 1008 1009 def eof_received(self): 1010 assert self.state == 'CONNECTED', self.state 1011 self.state = 'EOF' 1012 1013 def connection_lost(self, exc): 1014 assert self.state in ('CONNECTED', 'EOF'), self.state 1015 self.state = 'CLOSED' 1016 if self.done: 1017 self.done.set_result(None) 1018 1019 1020class MyDatagramProto(asyncio.DatagramProtocol): 1021 done = None 1022 1023 def __init__(self, create_future=False, loop=None): 1024 self.state = 'INITIAL' 1025 self.nbytes = 0 1026 if create_future: 1027 self.done = loop.create_future() 1028 1029 def connection_made(self, transport): 1030 self.transport = transport 1031 assert self.state == 'INITIAL', self.state 1032 self.state = 'INITIALIZED' 1033 1034 def datagram_received(self, data, addr): 1035 assert self.state == 'INITIALIZED', self.state 1036 self.nbytes += len(data) 1037 1038 def error_received(self, exc): 1039 assert self.state == 'INITIALIZED', self.state 1040 1041 def connection_lost(self, exc): 1042 assert self.state == 'INITIALIZED', self.state 1043 self.state = 'CLOSED' 1044 if self.done: 1045 self.done.set_result(None) 1046 1047 1048class BaseEventLoopWithSelectorTests(test_utils.TestCase): 1049 1050 def setUp(self): 1051 super().setUp() 1052 self.loop = asyncio.SelectorEventLoop() 1053 self.set_event_loop(self.loop) 1054 1055 @mock.patch('socket.getnameinfo') 1056 def test_getnameinfo(self, m_gai): 1057 m_gai.side_effect = lambda *args: 42 1058 r = self.loop.run_until_complete(self.loop.getnameinfo(('abc', 123))) 1059 self.assertEqual(r, 42) 1060 1061 @patch_socket 1062 def test_create_connection_multiple_errors(self, m_socket): 1063 1064 class MyProto(asyncio.Protocol): 1065 pass 1066 1067 async def getaddrinfo(*args, **kw): 1068 return [(2, 1, 6, '', ('107.6.106.82', 80)), 1069 (2, 1, 6, '', ('107.6.106.82', 80))] 1070 1071 def getaddrinfo_task(*args, **kwds): 1072 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1073 1074 idx = -1 1075 errors = ['err1', 'err2'] 1076 1077 def _socket(*args, **kw): 1078 nonlocal idx, errors 1079 idx += 1 1080 raise OSError(errors[idx]) 1081 1082 m_socket.socket = _socket 1083 1084 self.loop.getaddrinfo = getaddrinfo_task 1085 1086 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1087 with self.assertRaises(OSError) as cm: 1088 self.loop.run_until_complete(coro) 1089 1090 self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2') 1091 1092 @patch_socket 1093 def test_create_connection_timeout(self, m_socket): 1094 # Ensure that the socket is closed on timeout 1095 sock = mock.Mock() 1096 m_socket.socket.return_value = sock 1097 1098 def getaddrinfo(*args, **kw): 1099 fut = self.loop.create_future() 1100 addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '', 1101 ('127.0.0.1', 80)) 1102 fut.set_result([addr]) 1103 return fut 1104 self.loop.getaddrinfo = getaddrinfo 1105 1106 with mock.patch.object(self.loop, 'sock_connect', 1107 side_effect=asyncio.TimeoutError): 1108 coro = self.loop.create_connection(MyProto, '127.0.0.1', 80) 1109 with self.assertRaises(asyncio.TimeoutError): 1110 self.loop.run_until_complete(coro) 1111 self.assertTrue(sock.close.called) 1112 1113 def test_create_connection_host_port_sock(self): 1114 coro = self.loop.create_connection( 1115 MyProto, 'example.com', 80, sock=object()) 1116 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1117 1118 def test_create_connection_wrong_sock(self): 1119 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1120 with sock: 1121 coro = self.loop.create_connection(MyProto, sock=sock) 1122 with self.assertRaisesRegex(ValueError, 1123 'A Stream Socket was expected'): 1124 self.loop.run_until_complete(coro) 1125 1126 def test_create_server_wrong_sock(self): 1127 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1128 with sock: 1129 coro = self.loop.create_server(MyProto, sock=sock) 1130 with self.assertRaisesRegex(ValueError, 1131 'A Stream Socket was expected'): 1132 self.loop.run_until_complete(coro) 1133 1134 def test_create_server_ssl_timeout_for_plain_socket(self): 1135 coro = self.loop.create_server( 1136 MyProto, 'example.com', 80, ssl_handshake_timeout=1) 1137 with self.assertRaisesRegex( 1138 ValueError, 1139 'ssl_handshake_timeout is only meaningful with ssl'): 1140 self.loop.run_until_complete(coro) 1141 1142 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 1143 'no socket.SOCK_NONBLOCK (linux only)') 1144 def test_create_server_stream_bittype(self): 1145 sock = socket.socket( 1146 socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK) 1147 with sock: 1148 coro = self.loop.create_server(lambda: None, sock=sock) 1149 srv = self.loop.run_until_complete(coro) 1150 srv.close() 1151 self.loop.run_until_complete(srv.wait_closed()) 1152 1153 @unittest.skipUnless(support.IPV6_ENABLED, 'no IPv6 support') 1154 def test_create_server_ipv6(self): 1155 async def main(): 1156 with self.assertWarns(DeprecationWarning): 1157 srv = await asyncio.start_server( 1158 lambda: None, '::1', 0, loop=self.loop) 1159 try: 1160 self.assertGreater(len(srv.sockets), 0) 1161 finally: 1162 srv.close() 1163 await srv.wait_closed() 1164 1165 try: 1166 self.loop.run_until_complete(main()) 1167 except OSError as ex: 1168 if (hasattr(errno, 'EADDRNOTAVAIL') and 1169 ex.errno == errno.EADDRNOTAVAIL): 1170 self.skipTest('failed to bind to ::1') 1171 else: 1172 raise 1173 1174 def test_create_datagram_endpoint_wrong_sock(self): 1175 sock = socket.socket(socket.AF_INET) 1176 with sock: 1177 coro = self.loop.create_datagram_endpoint(MyProto, sock=sock) 1178 with self.assertRaisesRegex(ValueError, 1179 'A UDP Socket was expected'): 1180 self.loop.run_until_complete(coro) 1181 1182 def test_create_connection_no_host_port_sock(self): 1183 coro = self.loop.create_connection(MyProto) 1184 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1185 1186 def test_create_connection_no_getaddrinfo(self): 1187 async def getaddrinfo(*args, **kw): 1188 return [] 1189 1190 def getaddrinfo_task(*args, **kwds): 1191 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1192 1193 self.loop.getaddrinfo = getaddrinfo_task 1194 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1195 self.assertRaises( 1196 OSError, self.loop.run_until_complete, coro) 1197 1198 def test_create_connection_connect_err(self): 1199 async def getaddrinfo(*args, **kw): 1200 return [(2, 1, 6, '', ('107.6.106.82', 80))] 1201 1202 def getaddrinfo_task(*args, **kwds): 1203 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1204 1205 self.loop.getaddrinfo = getaddrinfo_task 1206 self.loop.sock_connect = mock.Mock() 1207 self.loop.sock_connect.side_effect = OSError 1208 1209 coro = self.loop.create_connection(MyProto, 'example.com', 80) 1210 self.assertRaises( 1211 OSError, self.loop.run_until_complete, coro) 1212 1213 def test_create_connection_multiple(self): 1214 async def getaddrinfo(*args, **kw): 1215 return [(2, 1, 6, '', ('0.0.0.1', 80)), 1216 (2, 1, 6, '', ('0.0.0.2', 80))] 1217 1218 def getaddrinfo_task(*args, **kwds): 1219 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1220 1221 self.loop.getaddrinfo = getaddrinfo_task 1222 self.loop.sock_connect = mock.Mock() 1223 self.loop.sock_connect.side_effect = OSError 1224 1225 coro = self.loop.create_connection( 1226 MyProto, 'example.com', 80, family=socket.AF_INET) 1227 with self.assertRaises(OSError): 1228 self.loop.run_until_complete(coro) 1229 1230 @patch_socket 1231 def test_create_connection_multiple_errors_local_addr(self, m_socket): 1232 1233 def bind(addr): 1234 if addr[0] == '0.0.0.1': 1235 err = OSError('Err') 1236 err.strerror = 'Err' 1237 raise err 1238 1239 m_socket.socket.return_value.bind = bind 1240 1241 async def getaddrinfo(*args, **kw): 1242 return [(2, 1, 6, '', ('0.0.0.1', 80)), 1243 (2, 1, 6, '', ('0.0.0.2', 80))] 1244 1245 def getaddrinfo_task(*args, **kwds): 1246 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1247 1248 self.loop.getaddrinfo = getaddrinfo_task 1249 self.loop.sock_connect = mock.Mock() 1250 self.loop.sock_connect.side_effect = OSError('Err2') 1251 1252 coro = self.loop.create_connection( 1253 MyProto, 'example.com', 80, family=socket.AF_INET, 1254 local_addr=(None, 8080)) 1255 with self.assertRaises(OSError) as cm: 1256 self.loop.run_until_complete(coro) 1257 1258 self.assertTrue(str(cm.exception).startswith('Multiple exceptions: ')) 1259 self.assertTrue(m_socket.socket.return_value.close.called) 1260 1261 def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton): 1262 # Test the fallback code, even if this system has inet_pton. 1263 if not allow_inet_pton: 1264 del m_socket.inet_pton 1265 1266 m_socket.getaddrinfo = socket.getaddrinfo 1267 sock = m_socket.socket.return_value 1268 1269 self.loop._add_reader = mock.Mock() 1270 self.loop._add_reader._is_coroutine = False 1271 self.loop._add_writer = mock.Mock() 1272 self.loop._add_writer._is_coroutine = False 1273 1274 coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80) 1275 t, p = self.loop.run_until_complete(coro) 1276 try: 1277 sock.connect.assert_called_with(('1.2.3.4', 80)) 1278 _, kwargs = m_socket.socket.call_args 1279 self.assertEqual(kwargs['family'], m_socket.AF_INET) 1280 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1281 finally: 1282 t.close() 1283 test_utils.run_briefly(self.loop) # allow transport to close 1284 1285 if support.IPV6_ENABLED: 1286 sock.family = socket.AF_INET6 1287 coro = self.loop.create_connection(asyncio.Protocol, '::1', 80) 1288 t, p = self.loop.run_until_complete(coro) 1289 try: 1290 # Without inet_pton we use getaddrinfo, which transforms 1291 # ('::1', 80) to ('::1', 80, 0, 0). The last 0s are flow info, 1292 # scope id. 1293 [address] = sock.connect.call_args[0] 1294 host, port = address[:2] 1295 self.assertRegex(host, r'::(0\.)*1') 1296 self.assertEqual(port, 80) 1297 _, kwargs = m_socket.socket.call_args 1298 self.assertEqual(kwargs['family'], m_socket.AF_INET6) 1299 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1300 finally: 1301 t.close() 1302 test_utils.run_briefly(self.loop) # allow transport to close 1303 1304 @unittest.skipUnless(support.IPV6_ENABLED, 'no IPv6 support') 1305 @unittest.skipIf(sys.platform.startswith('aix'), 1306 "bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX") 1307 @patch_socket 1308 def test_create_connection_ipv6_scope(self, m_socket): 1309 m_socket.getaddrinfo = socket.getaddrinfo 1310 sock = m_socket.socket.return_value 1311 sock.family = socket.AF_INET6 1312 1313 self.loop._add_reader = mock.Mock() 1314 self.loop._add_reader._is_coroutine = False 1315 self.loop._add_writer = mock.Mock() 1316 self.loop._add_writer._is_coroutine = False 1317 1318 coro = self.loop.create_connection(asyncio.Protocol, 'fe80::1%1', 80) 1319 t, p = self.loop.run_until_complete(coro) 1320 try: 1321 sock.connect.assert_called_with(('fe80::1', 80, 0, 1)) 1322 _, kwargs = m_socket.socket.call_args 1323 self.assertEqual(kwargs['family'], m_socket.AF_INET6) 1324 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1325 finally: 1326 t.close() 1327 test_utils.run_briefly(self.loop) # allow transport to close 1328 1329 @patch_socket 1330 def test_create_connection_ip_addr(self, m_socket): 1331 self._test_create_connection_ip_addr(m_socket, True) 1332 1333 @patch_socket 1334 def test_create_connection_no_inet_pton(self, m_socket): 1335 self._test_create_connection_ip_addr(m_socket, False) 1336 1337 @patch_socket 1338 def test_create_connection_service_name(self, m_socket): 1339 m_socket.getaddrinfo = socket.getaddrinfo 1340 sock = m_socket.socket.return_value 1341 1342 self.loop._add_reader = mock.Mock() 1343 self.loop._add_reader._is_coroutine = False 1344 self.loop._add_writer = mock.Mock() 1345 self.loop._add_writer._is_coroutine = False 1346 1347 for service, port in ('http', 80), (b'http', 80): 1348 coro = self.loop.create_connection(asyncio.Protocol, 1349 '127.0.0.1', service) 1350 1351 t, p = self.loop.run_until_complete(coro) 1352 try: 1353 sock.connect.assert_called_with(('127.0.0.1', port)) 1354 _, kwargs = m_socket.socket.call_args 1355 self.assertEqual(kwargs['family'], m_socket.AF_INET) 1356 self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 1357 finally: 1358 t.close() 1359 test_utils.run_briefly(self.loop) # allow transport to close 1360 1361 for service in 'nonsense', b'nonsense': 1362 coro = self.loop.create_connection(asyncio.Protocol, 1363 '127.0.0.1', service) 1364 1365 with self.assertRaises(OSError): 1366 self.loop.run_until_complete(coro) 1367 1368 def test_create_connection_no_local_addr(self): 1369 async def getaddrinfo(host, *args, **kw): 1370 if host == 'example.com': 1371 return [(2, 1, 6, '', ('107.6.106.82', 80)), 1372 (2, 1, 6, '', ('107.6.106.82', 80))] 1373 else: 1374 return [] 1375 1376 def getaddrinfo_task(*args, **kwds): 1377 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1378 self.loop.getaddrinfo = getaddrinfo_task 1379 1380 coro = self.loop.create_connection( 1381 MyProto, 'example.com', 80, family=socket.AF_INET, 1382 local_addr=(None, 8080)) 1383 self.assertRaises( 1384 OSError, self.loop.run_until_complete, coro) 1385 1386 @patch_socket 1387 def test_create_connection_bluetooth(self, m_socket): 1388 # See http://bugs.python.org/issue27136, fallback to getaddrinfo when 1389 # we can't recognize an address is resolved, e.g. a Bluetooth address. 1390 addr = ('00:01:02:03:04:05', 1) 1391 1392 def getaddrinfo(host, port, *args, **kw): 1393 assert (host, port) == addr 1394 return [(999, 1, 999, '', (addr, 1))] 1395 1396 m_socket.getaddrinfo = getaddrinfo 1397 sock = m_socket.socket() 1398 coro = self.loop.sock_connect(sock, addr) 1399 self.loop.run_until_complete(coro) 1400 1401 def test_create_connection_ssl_server_hostname_default(self): 1402 self.loop.getaddrinfo = mock.Mock() 1403 1404 def mock_getaddrinfo(*args, **kwds): 1405 f = self.loop.create_future() 1406 f.set_result([(socket.AF_INET, socket.SOCK_STREAM, 1407 socket.SOL_TCP, '', ('1.2.3.4', 80))]) 1408 return f 1409 1410 self.loop.getaddrinfo.side_effect = mock_getaddrinfo 1411 self.loop.sock_connect = mock.Mock() 1412 self.loop.sock_connect.return_value = self.loop.create_future() 1413 self.loop.sock_connect.return_value.set_result(None) 1414 self.loop._make_ssl_transport = mock.Mock() 1415 1416 class _SelectorTransportMock: 1417 _sock = None 1418 1419 def get_extra_info(self, key): 1420 return mock.Mock() 1421 1422 def close(self): 1423 self._sock.close() 1424 1425 def mock_make_ssl_transport(sock, protocol, sslcontext, waiter, 1426 **kwds): 1427 waiter.set_result(None) 1428 transport = _SelectorTransportMock() 1429 transport._sock = sock 1430 return transport 1431 1432 self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport 1433 ANY = mock.ANY 1434 handshake_timeout = object() 1435 # First try the default server_hostname. 1436 self.loop._make_ssl_transport.reset_mock() 1437 coro = self.loop.create_connection( 1438 MyProto, 'python.org', 80, ssl=True, 1439 ssl_handshake_timeout=handshake_timeout) 1440 transport, _ = self.loop.run_until_complete(coro) 1441 transport.close() 1442 self.loop._make_ssl_transport.assert_called_with( 1443 ANY, ANY, ANY, ANY, 1444 server_side=False, 1445 server_hostname='python.org', 1446 ssl_handshake_timeout=handshake_timeout) 1447 # Next try an explicit server_hostname. 1448 self.loop._make_ssl_transport.reset_mock() 1449 coro = self.loop.create_connection( 1450 MyProto, 'python.org', 80, ssl=True, 1451 server_hostname='perl.com', 1452 ssl_handshake_timeout=handshake_timeout) 1453 transport, _ = self.loop.run_until_complete(coro) 1454 transport.close() 1455 self.loop._make_ssl_transport.assert_called_with( 1456 ANY, ANY, ANY, ANY, 1457 server_side=False, 1458 server_hostname='perl.com', 1459 ssl_handshake_timeout=handshake_timeout) 1460 # Finally try an explicit empty server_hostname. 1461 self.loop._make_ssl_transport.reset_mock() 1462 coro = self.loop.create_connection( 1463 MyProto, 'python.org', 80, ssl=True, 1464 server_hostname='', 1465 ssl_handshake_timeout=handshake_timeout) 1466 transport, _ = self.loop.run_until_complete(coro) 1467 transport.close() 1468 self.loop._make_ssl_transport.assert_called_with( 1469 ANY, ANY, ANY, ANY, 1470 server_side=False, 1471 server_hostname='', 1472 ssl_handshake_timeout=handshake_timeout) 1473 1474 def test_create_connection_no_ssl_server_hostname_errors(self): 1475 # When not using ssl, server_hostname must be None. 1476 coro = self.loop.create_connection(MyProto, 'python.org', 80, 1477 server_hostname='') 1478 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1479 coro = self.loop.create_connection(MyProto, 'python.org', 80, 1480 server_hostname='python.org') 1481 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1482 1483 def test_create_connection_ssl_server_hostname_errors(self): 1484 # When using ssl, server_hostname may be None if host is non-empty. 1485 coro = self.loop.create_connection(MyProto, '', 80, ssl=True) 1486 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1487 coro = self.loop.create_connection(MyProto, None, 80, ssl=True) 1488 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1489 sock = socket.socket() 1490 coro = self.loop.create_connection(MyProto, None, None, 1491 ssl=True, sock=sock) 1492 self.addCleanup(sock.close) 1493 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1494 1495 def test_create_connection_ssl_timeout_for_plain_socket(self): 1496 coro = self.loop.create_connection( 1497 MyProto, 'example.com', 80, ssl_handshake_timeout=1) 1498 with self.assertRaisesRegex( 1499 ValueError, 1500 'ssl_handshake_timeout is only meaningful with ssl'): 1501 self.loop.run_until_complete(coro) 1502 1503 def test_create_server_empty_host(self): 1504 # if host is empty string use None instead 1505 host = object() 1506 1507 async def getaddrinfo(*args, **kw): 1508 nonlocal host 1509 host = args[0] 1510 return [] 1511 1512 def getaddrinfo_task(*args, **kwds): 1513 return self.loop.create_task(getaddrinfo(*args, **kwds)) 1514 1515 self.loop.getaddrinfo = getaddrinfo_task 1516 fut = self.loop.create_server(MyProto, '', 0) 1517 self.assertRaises(OSError, self.loop.run_until_complete, fut) 1518 self.assertIsNone(host) 1519 1520 def test_create_server_host_port_sock(self): 1521 fut = self.loop.create_server( 1522 MyProto, '0.0.0.0', 0, sock=object()) 1523 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1524 1525 def test_create_server_no_host_port_sock(self): 1526 fut = self.loop.create_server(MyProto) 1527 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1528 1529 def test_create_server_no_getaddrinfo(self): 1530 getaddrinfo = self.loop.getaddrinfo = mock.Mock() 1531 getaddrinfo.return_value = self.loop.create_future() 1532 getaddrinfo.return_value.set_result(None) 1533 1534 f = self.loop.create_server(MyProto, 'python.org', 0) 1535 self.assertRaises(OSError, self.loop.run_until_complete, f) 1536 1537 @patch_socket 1538 def test_create_server_nosoreuseport(self, m_socket): 1539 m_socket.getaddrinfo = socket.getaddrinfo 1540 del m_socket.SO_REUSEPORT 1541 m_socket.socket.return_value = mock.Mock() 1542 1543 f = self.loop.create_server( 1544 MyProto, '0.0.0.0', 0, reuse_port=True) 1545 1546 self.assertRaises(ValueError, self.loop.run_until_complete, f) 1547 1548 @patch_socket 1549 def test_create_server_soreuseport_only_defined(self, m_socket): 1550 m_socket.getaddrinfo = socket.getaddrinfo 1551 m_socket.socket.return_value = mock.Mock() 1552 m_socket.SO_REUSEPORT = -1 1553 1554 f = self.loop.create_server( 1555 MyProto, '0.0.0.0', 0, reuse_port=True) 1556 1557 self.assertRaises(ValueError, self.loop.run_until_complete, f) 1558 1559 @patch_socket 1560 def test_create_server_cant_bind(self, m_socket): 1561 1562 class Err(OSError): 1563 strerror = 'error' 1564 1565 m_socket.getaddrinfo.return_value = [ 1566 (2, 1, 6, '', ('127.0.0.1', 10100))] 1567 m_socket.getaddrinfo._is_coroutine = False 1568 m_sock = m_socket.socket.return_value = mock.Mock() 1569 m_sock.bind.side_effect = Err 1570 1571 fut = self.loop.create_server(MyProto, '0.0.0.0', 0) 1572 self.assertRaises(OSError, self.loop.run_until_complete, fut) 1573 self.assertTrue(m_sock.close.called) 1574 1575 @patch_socket 1576 def test_create_datagram_endpoint_no_addrinfo(self, m_socket): 1577 m_socket.getaddrinfo.return_value = [] 1578 m_socket.getaddrinfo._is_coroutine = False 1579 1580 coro = self.loop.create_datagram_endpoint( 1581 MyDatagramProto, local_addr=('localhost', 0)) 1582 self.assertRaises( 1583 OSError, self.loop.run_until_complete, coro) 1584 1585 def test_create_datagram_endpoint_addr_error(self): 1586 coro = self.loop.create_datagram_endpoint( 1587 MyDatagramProto, local_addr='localhost') 1588 self.assertRaises( 1589 AssertionError, self.loop.run_until_complete, coro) 1590 coro = self.loop.create_datagram_endpoint( 1591 MyDatagramProto, local_addr=('localhost', 1, 2, 3)) 1592 self.assertRaises( 1593 AssertionError, self.loop.run_until_complete, coro) 1594 1595 def test_create_datagram_endpoint_connect_err(self): 1596 self.loop.sock_connect = mock.Mock() 1597 self.loop.sock_connect.side_effect = OSError 1598 1599 coro = self.loop.create_datagram_endpoint( 1600 asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0)) 1601 self.assertRaises( 1602 OSError, self.loop.run_until_complete, coro) 1603 1604 def test_create_datagram_endpoint_allow_broadcast(self): 1605 protocol = MyDatagramProto(create_future=True, loop=self.loop) 1606 self.loop.sock_connect = sock_connect = mock.Mock() 1607 sock_connect.return_value = [] 1608 1609 coro = self.loop.create_datagram_endpoint( 1610 lambda: protocol, 1611 remote_addr=('127.0.0.1', 0), 1612 allow_broadcast=True) 1613 1614 transport, _ = self.loop.run_until_complete(coro) 1615 self.assertFalse(sock_connect.called) 1616 1617 transport.close() 1618 self.loop.run_until_complete(protocol.done) 1619 self.assertEqual('CLOSED', protocol.state) 1620 1621 @patch_socket 1622 def test_create_datagram_endpoint_socket_err(self, m_socket): 1623 m_socket.getaddrinfo = socket.getaddrinfo 1624 m_socket.socket.side_effect = OSError 1625 1626 coro = self.loop.create_datagram_endpoint( 1627 asyncio.DatagramProtocol, family=socket.AF_INET) 1628 self.assertRaises( 1629 OSError, self.loop.run_until_complete, coro) 1630 1631 coro = self.loop.create_datagram_endpoint( 1632 asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0)) 1633 self.assertRaises( 1634 OSError, self.loop.run_until_complete, coro) 1635 1636 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled') 1637 def test_create_datagram_endpoint_no_matching_family(self): 1638 coro = self.loop.create_datagram_endpoint( 1639 asyncio.DatagramProtocol, 1640 remote_addr=('127.0.0.1', 0), local_addr=('::1', 0)) 1641 self.assertRaises( 1642 ValueError, self.loop.run_until_complete, coro) 1643 1644 @patch_socket 1645 def test_create_datagram_endpoint_setblk_err(self, m_socket): 1646 m_socket.socket.return_value.setblocking.side_effect = OSError 1647 1648 coro = self.loop.create_datagram_endpoint( 1649 asyncio.DatagramProtocol, family=socket.AF_INET) 1650 self.assertRaises( 1651 OSError, self.loop.run_until_complete, coro) 1652 self.assertTrue( 1653 m_socket.socket.return_value.close.called) 1654 1655 def test_create_datagram_endpoint_noaddr_nofamily(self): 1656 coro = self.loop.create_datagram_endpoint( 1657 asyncio.DatagramProtocol) 1658 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1659 1660 @patch_socket 1661 def test_create_datagram_endpoint_cant_bind(self, m_socket): 1662 class Err(OSError): 1663 pass 1664 1665 m_socket.getaddrinfo = socket.getaddrinfo 1666 m_sock = m_socket.socket.return_value = mock.Mock() 1667 m_sock.bind.side_effect = Err 1668 1669 fut = self.loop.create_datagram_endpoint( 1670 MyDatagramProto, 1671 local_addr=('127.0.0.1', 0), family=socket.AF_INET) 1672 self.assertRaises(Err, self.loop.run_until_complete, fut) 1673 self.assertTrue(m_sock.close.called) 1674 1675 def test_create_datagram_endpoint_sock(self): 1676 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1677 sock.bind(('127.0.0.1', 0)) 1678 fut = self.loop.create_datagram_endpoint( 1679 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1680 sock=sock) 1681 transport, protocol = self.loop.run_until_complete(fut) 1682 transport.close() 1683 self.loop.run_until_complete(protocol.done) 1684 self.assertEqual('CLOSED', protocol.state) 1685 1686 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 1687 def test_create_datagram_endpoint_sock_unix(self): 1688 fut = self.loop.create_datagram_endpoint( 1689 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1690 family=socket.AF_UNIX) 1691 transport, protocol = self.loop.run_until_complete(fut) 1692 assert transport._sock.family == socket.AF_UNIX 1693 transport.close() 1694 self.loop.run_until_complete(protocol.done) 1695 self.assertEqual('CLOSED', protocol.state) 1696 1697 @support.skip_unless_bind_unix_socket 1698 def test_create_datagram_endpoint_existing_sock_unix(self): 1699 with test_utils.unix_socket_path() as path: 1700 sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM) 1701 sock.bind(path) 1702 sock.close() 1703 1704 coro = self.loop.create_datagram_endpoint( 1705 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1706 path, family=socket.AF_UNIX) 1707 transport, protocol = self.loop.run_until_complete(coro) 1708 transport.close() 1709 self.loop.run_until_complete(protocol.done) 1710 1711 def test_create_datagram_endpoint_sock_sockopts(self): 1712 class FakeSock: 1713 type = socket.SOCK_DGRAM 1714 1715 fut = self.loop.create_datagram_endpoint( 1716 MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock()) 1717 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1718 1719 fut = self.loop.create_datagram_endpoint( 1720 MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock()) 1721 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1722 1723 fut = self.loop.create_datagram_endpoint( 1724 MyDatagramProto, family=1, sock=FakeSock()) 1725 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1726 1727 fut = self.loop.create_datagram_endpoint( 1728 MyDatagramProto, proto=1, sock=FakeSock()) 1729 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1730 1731 fut = self.loop.create_datagram_endpoint( 1732 MyDatagramProto, flags=1, sock=FakeSock()) 1733 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1734 1735 fut = self.loop.create_datagram_endpoint( 1736 MyDatagramProto, reuse_port=True, sock=FakeSock()) 1737 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1738 1739 fut = self.loop.create_datagram_endpoint( 1740 MyDatagramProto, allow_broadcast=True, sock=FakeSock()) 1741 self.assertRaises(ValueError, self.loop.run_until_complete, fut) 1742 1743 def test_create_datagram_endpoint_sockopts(self): 1744 # Socket options should not be applied unless asked for. 1745 # SO_REUSEPORT is not available on all platforms. 1746 1747 coro = self.loop.create_datagram_endpoint( 1748 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1749 local_addr=('127.0.0.1', 0)) 1750 transport, protocol = self.loop.run_until_complete(coro) 1751 sock = transport.get_extra_info('socket') 1752 1753 reuseport_supported = hasattr(socket, 'SO_REUSEPORT') 1754 1755 if reuseport_supported: 1756 self.assertFalse( 1757 sock.getsockopt( 1758 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 1759 self.assertFalse( 1760 sock.getsockopt( 1761 socket.SOL_SOCKET, socket.SO_BROADCAST)) 1762 1763 transport.close() 1764 self.loop.run_until_complete(protocol.done) 1765 self.assertEqual('CLOSED', protocol.state) 1766 1767 coro = self.loop.create_datagram_endpoint( 1768 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1769 local_addr=('127.0.0.1', 0), 1770 reuse_port=reuseport_supported, 1771 allow_broadcast=True) 1772 transport, protocol = self.loop.run_until_complete(coro) 1773 sock = transport.get_extra_info('socket') 1774 1775 self.assertFalse( 1776 sock.getsockopt( 1777 socket.SOL_SOCKET, socket.SO_REUSEADDR)) 1778 if reuseport_supported: 1779 self.assertTrue( 1780 sock.getsockopt( 1781 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 1782 self.assertTrue( 1783 sock.getsockopt( 1784 socket.SOL_SOCKET, socket.SO_BROADCAST)) 1785 1786 transport.close() 1787 self.loop.run_until_complete(protocol.done) 1788 self.assertEqual('CLOSED', protocol.state) 1789 1790 def test_create_datagram_endpoint_reuse_address_error(self): 1791 # bpo-37228: Ensure that explicit passing of `reuse_address=True` 1792 # raises an error, as it is not safe to use SO_REUSEADDR when using UDP 1793 1794 coro = self.loop.create_datagram_endpoint( 1795 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1796 local_addr=('127.0.0.1', 0), 1797 reuse_address=True) 1798 1799 with self.assertRaises(ValueError): 1800 self.loop.run_until_complete(coro) 1801 1802 def test_create_datagram_endpoint_reuse_address_warning(self): 1803 # bpo-37228: Deprecate *reuse_address* parameter 1804 1805 coro = self.loop.create_datagram_endpoint( 1806 lambda: MyDatagramProto(create_future=True, loop=self.loop), 1807 local_addr=('127.0.0.1', 0), 1808 reuse_address=False) 1809 1810 with self.assertWarns(DeprecationWarning): 1811 transport, protocol = self.loop.run_until_complete(coro) 1812 transport.close() 1813 self.loop.run_until_complete(protocol.done) 1814 self.assertEqual('CLOSED', protocol.state) 1815 1816 @patch_socket 1817 def test_create_datagram_endpoint_nosoreuseport(self, m_socket): 1818 del m_socket.SO_REUSEPORT 1819 m_socket.socket.return_value = mock.Mock() 1820 1821 coro = self.loop.create_datagram_endpoint( 1822 lambda: MyDatagramProto(loop=self.loop), 1823 local_addr=('127.0.0.1', 0), 1824 reuse_port=True) 1825 1826 self.assertRaises(ValueError, self.loop.run_until_complete, coro) 1827 1828 @patch_socket 1829 def test_create_datagram_endpoint_ip_addr(self, m_socket): 1830 def getaddrinfo(*args, **kw): 1831 self.fail('should not have called getaddrinfo') 1832 1833 m_socket.getaddrinfo = getaddrinfo 1834 m_socket.socket.return_value.bind = bind = mock.Mock() 1835 self.loop._add_reader = mock.Mock() 1836 self.loop._add_reader._is_coroutine = False 1837 1838 reuseport_supported = hasattr(socket, 'SO_REUSEPORT') 1839 coro = self.loop.create_datagram_endpoint( 1840 lambda: MyDatagramProto(loop=self.loop), 1841 local_addr=('1.2.3.4', 0), 1842 reuse_port=reuseport_supported) 1843 1844 t, p = self.loop.run_until_complete(coro) 1845 try: 1846 bind.assert_called_with(('1.2.3.4', 0)) 1847 m_socket.socket.assert_called_with(family=m_socket.AF_INET, 1848 proto=m_socket.IPPROTO_UDP, 1849 type=m_socket.SOCK_DGRAM) 1850 finally: 1851 t.close() 1852 test_utils.run_briefly(self.loop) # allow transport to close 1853 1854 def test_accept_connection_retry(self): 1855 sock = mock.Mock() 1856 sock.accept.side_effect = BlockingIOError() 1857 1858 self.loop._accept_connection(MyProto, sock) 1859 self.assertFalse(sock.close.called) 1860 1861 @mock.patch('asyncio.base_events.logger') 1862 def test_accept_connection_exception(self, m_log): 1863 sock = mock.Mock() 1864 sock.fileno.return_value = 10 1865 sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files') 1866 self.loop._remove_reader = mock.Mock() 1867 self.loop.call_later = mock.Mock() 1868 1869 self.loop._accept_connection(MyProto, sock) 1870 self.assertTrue(m_log.error.called) 1871 self.assertFalse(sock.close.called) 1872 self.loop._remove_reader.assert_called_with(10) 1873 self.loop.call_later.assert_called_with( 1874 constants.ACCEPT_RETRY_DELAY, 1875 # self.loop._start_serving 1876 mock.ANY, 1877 MyProto, sock, None, None, mock.ANY, mock.ANY) 1878 1879 def test_call_coroutine(self): 1880 with self.assertWarns(DeprecationWarning): 1881 @asyncio.coroutine 1882 def simple_coroutine(): 1883 pass 1884 1885 self.loop.set_debug(True) 1886 coro_func = simple_coroutine 1887 coro_obj = coro_func() 1888 self.addCleanup(coro_obj.close) 1889 for func in (coro_func, coro_obj): 1890 with self.assertRaises(TypeError): 1891 self.loop.call_soon(func) 1892 with self.assertRaises(TypeError): 1893 self.loop.call_soon_threadsafe(func) 1894 with self.assertRaises(TypeError): 1895 self.loop.call_later(60, func) 1896 with self.assertRaises(TypeError): 1897 self.loop.call_at(self.loop.time() + 60, func) 1898 with self.assertRaises(TypeError): 1899 self.loop.run_until_complete( 1900 self.loop.run_in_executor(None, func)) 1901 1902 @mock.patch('asyncio.base_events.logger') 1903 def test_log_slow_callbacks(self, m_logger): 1904 def stop_loop_cb(loop): 1905 loop.stop() 1906 1907 async def stop_loop_coro(loop): 1908 loop.stop() 1909 1910 asyncio.set_event_loop(self.loop) 1911 self.loop.set_debug(True) 1912 self.loop.slow_callback_duration = 0.0 1913 1914 # slow callback 1915 self.loop.call_soon(stop_loop_cb, self.loop) 1916 self.loop.run_forever() 1917 fmt, *args = m_logger.warning.call_args[0] 1918 self.assertRegex(fmt % tuple(args), 1919 "^Executing <Handle.*stop_loop_cb.*> " 1920 "took .* seconds$") 1921 1922 # slow task 1923 asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop) 1924 self.loop.run_forever() 1925 fmt, *args = m_logger.warning.call_args[0] 1926 self.assertRegex(fmt % tuple(args), 1927 "^Executing <Task.*stop_loop_coro.*> " 1928 "took .* seconds$") 1929 1930 1931class RunningLoopTests(unittest.TestCase): 1932 1933 def test_running_loop_within_a_loop(self): 1934 async def runner(loop): 1935 loop.run_forever() 1936 1937 loop = asyncio.new_event_loop() 1938 outer_loop = asyncio.new_event_loop() 1939 try: 1940 with self.assertRaisesRegex(RuntimeError, 1941 'while another loop is running'): 1942 outer_loop.run_until_complete(runner(loop)) 1943 finally: 1944 loop.close() 1945 outer_loop.close() 1946 1947 1948class BaseLoopSockSendfileTests(test_utils.TestCase): 1949 1950 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 1951 1952 class MyProto(asyncio.Protocol): 1953 1954 def __init__(self, loop): 1955 self.started = False 1956 self.closed = False 1957 self.data = bytearray() 1958 self.fut = loop.create_future() 1959 self.transport = None 1960 1961 def connection_made(self, transport): 1962 self.started = True 1963 self.transport = transport 1964 1965 def data_received(self, data): 1966 self.data.extend(data) 1967 1968 def connection_lost(self, exc): 1969 self.closed = True 1970 self.fut.set_result(None) 1971 self.transport = None 1972 1973 async def wait_closed(self): 1974 await self.fut 1975 1976 @classmethod 1977 def setUpClass(cls): 1978 cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE 1979 constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16 1980 with open(support.TESTFN, 'wb') as fp: 1981 fp.write(cls.DATA) 1982 super().setUpClass() 1983 1984 @classmethod 1985 def tearDownClass(cls): 1986 constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize 1987 support.unlink(support.TESTFN) 1988 super().tearDownClass() 1989 1990 def setUp(self): 1991 from asyncio.selector_events import BaseSelectorEventLoop 1992 # BaseSelectorEventLoop() has no native implementation 1993 self.loop = BaseSelectorEventLoop() 1994 self.set_event_loop(self.loop) 1995 self.file = open(support.TESTFN, 'rb') 1996 self.addCleanup(self.file.close) 1997 super().setUp() 1998 1999 def make_socket(self, blocking=False): 2000 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 2001 sock.setblocking(blocking) 2002 self.addCleanup(sock.close) 2003 return sock 2004 2005 def run_loop(self, coro): 2006 return self.loop.run_until_complete(coro) 2007 2008 def prepare(self): 2009 sock = self.make_socket() 2010 proto = self.MyProto(self.loop) 2011 server = self.run_loop(self.loop.create_server( 2012 lambda: proto, support.HOST, 0, family=socket.AF_INET)) 2013 addr = server.sockets[0].getsockname() 2014 2015 for _ in range(10): 2016 try: 2017 self.run_loop(self.loop.sock_connect(sock, addr)) 2018 except OSError: 2019 self.run_loop(asyncio.sleep(0.5)) 2020 continue 2021 else: 2022 break 2023 else: 2024 # One last try, so we get the exception 2025 self.run_loop(self.loop.sock_connect(sock, addr)) 2026 2027 def cleanup(): 2028 server.close() 2029 self.run_loop(server.wait_closed()) 2030 sock.close() 2031 if proto.transport is not None: 2032 proto.transport.close() 2033 self.run_loop(proto.wait_closed()) 2034 2035 self.addCleanup(cleanup) 2036 2037 return sock, proto 2038 2039 def test__sock_sendfile_native_failure(self): 2040 sock, proto = self.prepare() 2041 2042 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 2043 "sendfile is not available"): 2044 self.run_loop(self.loop._sock_sendfile_native(sock, self.file, 2045 0, None)) 2046 2047 self.assertEqual(proto.data, b'') 2048 self.assertEqual(self.file.tell(), 0) 2049 2050 def test_sock_sendfile_no_fallback(self): 2051 sock, proto = self.prepare() 2052 2053 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 2054 "sendfile is not available"): 2055 self.run_loop(self.loop.sock_sendfile(sock, self.file, 2056 fallback=False)) 2057 2058 self.assertEqual(self.file.tell(), 0) 2059 self.assertEqual(proto.data, b'') 2060 2061 def test_sock_sendfile_fallback(self): 2062 sock, proto = self.prepare() 2063 2064 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2065 sock.close() 2066 self.run_loop(proto.wait_closed()) 2067 2068 self.assertEqual(ret, len(self.DATA)) 2069 self.assertEqual(self.file.tell(), len(self.DATA)) 2070 self.assertEqual(proto.data, self.DATA) 2071 2072 def test_sock_sendfile_fallback_offset_and_count(self): 2073 sock, proto = self.prepare() 2074 2075 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file, 2076 1000, 2000)) 2077 sock.close() 2078 self.run_loop(proto.wait_closed()) 2079 2080 self.assertEqual(ret, 2000) 2081 self.assertEqual(self.file.tell(), 3000) 2082 self.assertEqual(proto.data, self.DATA[1000:3000]) 2083 2084 def test_blocking_socket(self): 2085 self.loop.set_debug(True) 2086 sock = self.make_socket(blocking=True) 2087 with self.assertRaisesRegex(ValueError, "must be non-blocking"): 2088 self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2089 2090 def test_nonbinary_file(self): 2091 sock = self.make_socket() 2092 with open(support.TESTFN, 'r') as f: 2093 with self.assertRaisesRegex(ValueError, "binary mode"): 2094 self.run_loop(self.loop.sock_sendfile(sock, f)) 2095 2096 def test_nonstream_socket(self): 2097 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 2098 sock.setblocking(False) 2099 self.addCleanup(sock.close) 2100 with self.assertRaisesRegex(ValueError, "only SOCK_STREAM type"): 2101 self.run_loop(self.loop.sock_sendfile(sock, self.file)) 2102 2103 def test_notint_count(self): 2104 sock = self.make_socket() 2105 with self.assertRaisesRegex(TypeError, 2106 "count must be a positive integer"): 2107 self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, 'count')) 2108 2109 def test_negative_count(self): 2110 sock = self.make_socket() 2111 with self.assertRaisesRegex(ValueError, 2112 "count must be a positive integer"): 2113 self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, -1)) 2114 2115 def test_notint_offset(self): 2116 sock = self.make_socket() 2117 with self.assertRaisesRegex(TypeError, 2118 "offset must be a non-negative integer"): 2119 self.run_loop(self.loop.sock_sendfile(sock, self.file, 'offset')) 2120 2121 def test_negative_offset(self): 2122 sock = self.make_socket() 2123 with self.assertRaisesRegex(ValueError, 2124 "offset must be a non-negative integer"): 2125 self.run_loop(self.loop.sock_sendfile(sock, self.file, -1)) 2126 2127 2128class TestSelectorUtils(test_utils.TestCase): 2129 def check_set_nodelay(self, sock): 2130 opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) 2131 self.assertFalse(opt) 2132 2133 base_events._set_nodelay(sock) 2134 2135 opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) 2136 self.assertTrue(opt) 2137 2138 @unittest.skipUnless(hasattr(socket, 'TCP_NODELAY'), 2139 'need socket.TCP_NODELAY') 2140 def test_set_nodelay(self): 2141 sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, 2142 proto=socket.IPPROTO_TCP) 2143 with sock: 2144 self.check_set_nodelay(sock) 2145 2146 sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, 2147 proto=socket.IPPROTO_TCP) 2148 with sock: 2149 sock.setblocking(False) 2150 self.check_set_nodelay(sock) 2151 2152 2153 2154if __name__ == '__main__': 2155 unittest.main() 2156