1"""Tests for events.py.""" 2 3import collections.abc 4import concurrent.futures 5import functools 6import io 7import os 8import platform 9import re 10import signal 11import socket 12try: 13 import ssl 14except ImportError: 15 ssl = None 16import subprocess 17import sys 18import threading 19import time 20import errno 21import unittest 22from unittest import mock 23import weakref 24 25if sys.platform != 'win32': 26 import tty 27 28import asyncio 29from asyncio import coroutines 30from asyncio import events 31from asyncio import proactor_events 32from asyncio import selector_events 33from test.test_asyncio import utils as test_utils 34from test import support 35 36 37def tearDownModule(): 38 asyncio.set_event_loop_policy(None) 39 40 41def broken_unix_getsockname(): 42 """Return True if the platform is Mac OS 10.4 or older.""" 43 if sys.platform.startswith("aix"): 44 return True 45 elif sys.platform != 'darwin': 46 return False 47 version = platform.mac_ver()[0] 48 version = tuple(map(int, version.split('.'))) 49 return version < (10, 5) 50 51 52def _test_get_event_loop_new_process__sub_proc(): 53 async def doit(): 54 return 'hello' 55 56 loop = asyncio.new_event_loop() 57 asyncio.set_event_loop(loop) 58 return loop.run_until_complete(doit()) 59 60 61class CoroLike: 62 def send(self, v): 63 pass 64 65 def throw(self, *exc): 66 pass 67 68 def close(self): 69 pass 70 71 def __await__(self): 72 pass 73 74 75class MyBaseProto(asyncio.Protocol): 76 connected = None 77 done = None 78 79 def __init__(self, loop=None): 80 self.transport = None 81 self.state = 'INITIAL' 82 self.nbytes = 0 83 if loop is not None: 84 self.connected = loop.create_future() 85 self.done = loop.create_future() 86 87 def connection_made(self, transport): 88 self.transport = transport 89 assert self.state == 'INITIAL', self.state 90 self.state = 'CONNECTED' 91 if self.connected: 92 self.connected.set_result(None) 93 94 def data_received(self, data): 95 assert self.state == 'CONNECTED', self.state 96 self.nbytes += len(data) 97 98 def eof_received(self): 99 assert self.state == 'CONNECTED', self.state 100 self.state = 'EOF' 101 102 def connection_lost(self, exc): 103 assert self.state in ('CONNECTED', 'EOF'), self.state 104 self.state = 'CLOSED' 105 if self.done: 106 self.done.set_result(None) 107 108 109class MyProto(MyBaseProto): 110 def connection_made(self, transport): 111 super().connection_made(transport) 112 transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n') 113 114 115class MyDatagramProto(asyncio.DatagramProtocol): 116 done = None 117 118 def __init__(self, loop=None): 119 self.state = 'INITIAL' 120 self.nbytes = 0 121 if loop is not None: 122 self.done = loop.create_future() 123 124 def connection_made(self, transport): 125 self.transport = transport 126 assert self.state == 'INITIAL', self.state 127 self.state = 'INITIALIZED' 128 129 def datagram_received(self, data, addr): 130 assert self.state == 'INITIALIZED', self.state 131 self.nbytes += len(data) 132 133 def error_received(self, exc): 134 assert self.state == 'INITIALIZED', self.state 135 136 def connection_lost(self, exc): 137 assert self.state == 'INITIALIZED', self.state 138 self.state = 'CLOSED' 139 if self.done: 140 self.done.set_result(None) 141 142 143class MyReadPipeProto(asyncio.Protocol): 144 done = None 145 146 def __init__(self, loop=None): 147 self.state = ['INITIAL'] 148 self.nbytes = 0 149 self.transport = None 150 if loop is not None: 151 self.done = loop.create_future() 152 153 def connection_made(self, transport): 154 self.transport = transport 155 assert self.state == ['INITIAL'], self.state 156 self.state.append('CONNECTED') 157 158 def data_received(self, data): 159 assert self.state == ['INITIAL', 'CONNECTED'], self.state 160 self.nbytes += len(data) 161 162 def eof_received(self): 163 assert self.state == ['INITIAL', 'CONNECTED'], self.state 164 self.state.append('EOF') 165 166 def connection_lost(self, exc): 167 if 'EOF' not in self.state: 168 self.state.append('EOF') # It is okay if EOF is missed. 169 assert self.state == ['INITIAL', 'CONNECTED', 'EOF'], self.state 170 self.state.append('CLOSED') 171 if self.done: 172 self.done.set_result(None) 173 174 175class MyWritePipeProto(asyncio.BaseProtocol): 176 done = None 177 178 def __init__(self, loop=None): 179 self.state = 'INITIAL' 180 self.transport = None 181 if loop is not None: 182 self.done = loop.create_future() 183 184 def connection_made(self, transport): 185 self.transport = transport 186 assert self.state == 'INITIAL', self.state 187 self.state = 'CONNECTED' 188 189 def connection_lost(self, exc): 190 assert self.state == 'CONNECTED', self.state 191 self.state = 'CLOSED' 192 if self.done: 193 self.done.set_result(None) 194 195 196class MySubprocessProtocol(asyncio.SubprocessProtocol): 197 198 def __init__(self, loop): 199 self.state = 'INITIAL' 200 self.transport = None 201 self.connected = loop.create_future() 202 self.completed = loop.create_future() 203 self.disconnects = {fd: loop.create_future() for fd in range(3)} 204 self.data = {1: b'', 2: b''} 205 self.returncode = None 206 self.got_data = {1: asyncio.Event(loop=loop), 207 2: asyncio.Event(loop=loop)} 208 209 def connection_made(self, transport): 210 self.transport = transport 211 assert self.state == 'INITIAL', self.state 212 self.state = 'CONNECTED' 213 self.connected.set_result(None) 214 215 def connection_lost(self, exc): 216 assert self.state == 'CONNECTED', self.state 217 self.state = 'CLOSED' 218 self.completed.set_result(None) 219 220 def pipe_data_received(self, fd, data): 221 assert self.state == 'CONNECTED', self.state 222 self.data[fd] += data 223 self.got_data[fd].set() 224 225 def pipe_connection_lost(self, fd, exc): 226 assert self.state == 'CONNECTED', self.state 227 if exc: 228 self.disconnects[fd].set_exception(exc) 229 else: 230 self.disconnects[fd].set_result(exc) 231 232 def process_exited(self): 233 assert self.state == 'CONNECTED', self.state 234 self.returncode = self.transport.get_returncode() 235 236 237class EventLoopTestsMixin: 238 239 def setUp(self): 240 super().setUp() 241 self.loop = self.create_event_loop() 242 self.set_event_loop(self.loop) 243 244 def tearDown(self): 245 # just in case if we have transport close callbacks 246 if not self.loop.is_closed(): 247 test_utils.run_briefly(self.loop) 248 249 self.doCleanups() 250 support.gc_collect() 251 super().tearDown() 252 253 def test_run_until_complete_nesting(self): 254 async def coro1(): 255 await asyncio.sleep(0) 256 257 async def coro2(): 258 self.assertTrue(self.loop.is_running()) 259 self.loop.run_until_complete(coro1()) 260 261 self.assertRaises( 262 RuntimeError, self.loop.run_until_complete, coro2()) 263 264 # Note: because of the default Windows timing granularity of 265 # 15.6 msec, we use fairly long sleep times here (~100 msec). 266 267 def test_run_until_complete(self): 268 t0 = self.loop.time() 269 self.loop.run_until_complete(asyncio.sleep(0.1)) 270 t1 = self.loop.time() 271 self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0) 272 273 def test_run_until_complete_stopped(self): 274 275 async def cb(): 276 self.loop.stop() 277 await asyncio.sleep(0.1) 278 task = cb() 279 self.assertRaises(RuntimeError, 280 self.loop.run_until_complete, task) 281 282 def test_call_later(self): 283 results = [] 284 285 def callback(arg): 286 results.append(arg) 287 self.loop.stop() 288 289 self.loop.call_later(0.1, callback, 'hello world') 290 t0 = time.monotonic() 291 self.loop.run_forever() 292 t1 = time.monotonic() 293 self.assertEqual(results, ['hello world']) 294 self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0) 295 296 def test_call_soon(self): 297 results = [] 298 299 def callback(arg1, arg2): 300 results.append((arg1, arg2)) 301 self.loop.stop() 302 303 self.loop.call_soon(callback, 'hello', 'world') 304 self.loop.run_forever() 305 self.assertEqual(results, [('hello', 'world')]) 306 307 def test_call_soon_threadsafe(self): 308 results = [] 309 lock = threading.Lock() 310 311 def callback(arg): 312 results.append(arg) 313 if len(results) >= 2: 314 self.loop.stop() 315 316 def run_in_thread(): 317 self.loop.call_soon_threadsafe(callback, 'hello') 318 lock.release() 319 320 lock.acquire() 321 t = threading.Thread(target=run_in_thread) 322 t.start() 323 324 with lock: 325 self.loop.call_soon(callback, 'world') 326 self.loop.run_forever() 327 t.join() 328 self.assertEqual(results, ['hello', 'world']) 329 330 def test_call_soon_threadsafe_same_thread(self): 331 results = [] 332 333 def callback(arg): 334 results.append(arg) 335 if len(results) >= 2: 336 self.loop.stop() 337 338 self.loop.call_soon_threadsafe(callback, 'hello') 339 self.loop.call_soon(callback, 'world') 340 self.loop.run_forever() 341 self.assertEqual(results, ['hello', 'world']) 342 343 def test_run_in_executor(self): 344 def run(arg): 345 return (arg, threading.get_ident()) 346 f2 = self.loop.run_in_executor(None, run, 'yo') 347 res, thread_id = self.loop.run_until_complete(f2) 348 self.assertEqual(res, 'yo') 349 self.assertNotEqual(thread_id, threading.get_ident()) 350 351 def test_run_in_executor_cancel(self): 352 called = False 353 354 def patched_call_soon(*args): 355 nonlocal called 356 called = True 357 358 def run(): 359 time.sleep(0.05) 360 361 f2 = self.loop.run_in_executor(None, run) 362 f2.cancel() 363 self.loop.close() 364 self.loop.call_soon = patched_call_soon 365 self.loop.call_soon_threadsafe = patched_call_soon 366 time.sleep(0.4) 367 self.assertFalse(called) 368 369 def test_reader_callback(self): 370 r, w = socket.socketpair() 371 r.setblocking(False) 372 bytes_read = bytearray() 373 374 def reader(): 375 try: 376 data = r.recv(1024) 377 except BlockingIOError: 378 # Spurious readiness notifications are possible 379 # at least on Linux -- see man select. 380 return 381 if data: 382 bytes_read.extend(data) 383 else: 384 self.assertTrue(self.loop.remove_reader(r.fileno())) 385 r.close() 386 387 self.loop.add_reader(r.fileno(), reader) 388 self.loop.call_soon(w.send, b'abc') 389 test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3) 390 self.loop.call_soon(w.send, b'def') 391 test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6) 392 self.loop.call_soon(w.close) 393 self.loop.call_soon(self.loop.stop) 394 self.loop.run_forever() 395 self.assertEqual(bytes_read, b'abcdef') 396 397 def test_writer_callback(self): 398 r, w = socket.socketpair() 399 w.setblocking(False) 400 401 def writer(data): 402 w.send(data) 403 self.loop.stop() 404 405 data = b'x' * 1024 406 self.loop.add_writer(w.fileno(), writer, data) 407 self.loop.run_forever() 408 409 self.assertTrue(self.loop.remove_writer(w.fileno())) 410 self.assertFalse(self.loop.remove_writer(w.fileno())) 411 412 w.close() 413 read = r.recv(len(data) * 2) 414 r.close() 415 self.assertEqual(read, data) 416 417 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL') 418 def test_add_signal_handler(self): 419 caught = 0 420 421 def my_handler(): 422 nonlocal caught 423 caught += 1 424 425 # Check error behavior first. 426 self.assertRaises( 427 TypeError, self.loop.add_signal_handler, 'boom', my_handler) 428 self.assertRaises( 429 TypeError, self.loop.remove_signal_handler, 'boom') 430 self.assertRaises( 431 ValueError, self.loop.add_signal_handler, signal.NSIG+1, 432 my_handler) 433 self.assertRaises( 434 ValueError, self.loop.remove_signal_handler, signal.NSIG+1) 435 self.assertRaises( 436 ValueError, self.loop.add_signal_handler, 0, my_handler) 437 self.assertRaises( 438 ValueError, self.loop.remove_signal_handler, 0) 439 self.assertRaises( 440 ValueError, self.loop.add_signal_handler, -1, my_handler) 441 self.assertRaises( 442 ValueError, self.loop.remove_signal_handler, -1) 443 self.assertRaises( 444 RuntimeError, self.loop.add_signal_handler, signal.SIGKILL, 445 my_handler) 446 # Removing SIGKILL doesn't raise, since we don't call signal(). 447 self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL)) 448 # Now set a handler and handle it. 449 self.loop.add_signal_handler(signal.SIGINT, my_handler) 450 451 os.kill(os.getpid(), signal.SIGINT) 452 test_utils.run_until(self.loop, lambda: caught) 453 454 # Removing it should restore the default handler. 455 self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT)) 456 self.assertEqual(signal.getsignal(signal.SIGINT), 457 signal.default_int_handler) 458 # Removing again returns False. 459 self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT)) 460 461 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM') 462 def test_signal_handling_while_selecting(self): 463 # Test with a signal actually arriving during a select() call. 464 caught = 0 465 466 def my_handler(): 467 nonlocal caught 468 caught += 1 469 self.loop.stop() 470 471 self.loop.add_signal_handler(signal.SIGALRM, my_handler) 472 473 signal.setitimer(signal.ITIMER_REAL, 0.01, 0) # Send SIGALRM once. 474 self.loop.call_later(60, self.loop.stop) 475 self.loop.run_forever() 476 self.assertEqual(caught, 1) 477 478 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM') 479 def test_signal_handling_args(self): 480 some_args = (42,) 481 caught = 0 482 483 def my_handler(*args): 484 nonlocal caught 485 caught += 1 486 self.assertEqual(args, some_args) 487 self.loop.stop() 488 489 self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args) 490 491 signal.setitimer(signal.ITIMER_REAL, 0.1, 0) # Send SIGALRM once. 492 self.loop.call_later(60, self.loop.stop) 493 self.loop.run_forever() 494 self.assertEqual(caught, 1) 495 496 def _basetest_create_connection(self, connection_fut, check_sockname=True): 497 tr, pr = self.loop.run_until_complete(connection_fut) 498 self.assertIsInstance(tr, asyncio.Transport) 499 self.assertIsInstance(pr, asyncio.Protocol) 500 self.assertIs(pr.transport, tr) 501 if check_sockname: 502 self.assertIsNotNone(tr.get_extra_info('sockname')) 503 self.loop.run_until_complete(pr.done) 504 self.assertGreater(pr.nbytes, 0) 505 tr.close() 506 507 def test_create_connection(self): 508 with test_utils.run_test_server() as httpd: 509 conn_fut = self.loop.create_connection( 510 lambda: MyProto(loop=self.loop), *httpd.address) 511 self._basetest_create_connection(conn_fut) 512 513 @support.skip_unless_bind_unix_socket 514 def test_create_unix_connection(self): 515 # Issue #20682: On Mac OS X Tiger, getsockname() returns a 516 # zero-length address for UNIX socket. 517 check_sockname = not broken_unix_getsockname() 518 519 with test_utils.run_test_unix_server() as httpd: 520 conn_fut = self.loop.create_unix_connection( 521 lambda: MyProto(loop=self.loop), httpd.address) 522 self._basetest_create_connection(conn_fut, check_sockname) 523 524 def check_ssl_extra_info(self, client, check_sockname=True, 525 peername=None, peercert={}): 526 if check_sockname: 527 self.assertIsNotNone(client.get_extra_info('sockname')) 528 if peername: 529 self.assertEqual(peername, 530 client.get_extra_info('peername')) 531 else: 532 self.assertIsNotNone(client.get_extra_info('peername')) 533 self.assertEqual(peercert, 534 client.get_extra_info('peercert')) 535 536 # test SSL cipher 537 cipher = client.get_extra_info('cipher') 538 self.assertIsInstance(cipher, tuple) 539 self.assertEqual(len(cipher), 3, cipher) 540 self.assertIsInstance(cipher[0], str) 541 self.assertIsInstance(cipher[1], str) 542 self.assertIsInstance(cipher[2], int) 543 544 # test SSL object 545 sslobj = client.get_extra_info('ssl_object') 546 self.assertIsNotNone(sslobj) 547 self.assertEqual(sslobj.compression(), 548 client.get_extra_info('compression')) 549 self.assertEqual(sslobj.cipher(), 550 client.get_extra_info('cipher')) 551 self.assertEqual(sslobj.getpeercert(), 552 client.get_extra_info('peercert')) 553 self.assertEqual(sslobj.compression(), 554 client.get_extra_info('compression')) 555 556 def _basetest_create_ssl_connection(self, connection_fut, 557 check_sockname=True, 558 peername=None): 559 tr, pr = self.loop.run_until_complete(connection_fut) 560 self.assertIsInstance(tr, asyncio.Transport) 561 self.assertIsInstance(pr, asyncio.Protocol) 562 self.assertTrue('ssl' in tr.__class__.__name__.lower()) 563 self.check_ssl_extra_info(tr, check_sockname, peername) 564 self.loop.run_until_complete(pr.done) 565 self.assertGreater(pr.nbytes, 0) 566 tr.close() 567 568 def _test_create_ssl_connection(self, httpd, create_connection, 569 check_sockname=True, peername=None): 570 conn_fut = create_connection(ssl=test_utils.dummy_ssl_context()) 571 self._basetest_create_ssl_connection(conn_fut, check_sockname, 572 peername) 573 574 # ssl.Purpose was introduced in Python 3.4 575 if hasattr(ssl, 'Purpose'): 576 def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *, 577 cafile=None, capath=None, 578 cadata=None): 579 """ 580 A ssl.create_default_context() replacement that doesn't enable 581 cert validation. 582 """ 583 self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH) 584 return test_utils.dummy_ssl_context() 585 586 # With ssl=True, ssl.create_default_context() should be called 587 with mock.patch('ssl.create_default_context', 588 side_effect=_dummy_ssl_create_context) as m: 589 conn_fut = create_connection(ssl=True) 590 self._basetest_create_ssl_connection(conn_fut, check_sockname, 591 peername) 592 self.assertEqual(m.call_count, 1) 593 594 # With the real ssl.create_default_context(), certificate 595 # validation will fail 596 with self.assertRaises(ssl.SSLError) as cm: 597 conn_fut = create_connection(ssl=True) 598 # Ignore the "SSL handshake failed" log in debug mode 599 with test_utils.disable_logger(): 600 self._basetest_create_ssl_connection(conn_fut, check_sockname, 601 peername) 602 603 self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 604 605 @unittest.skipIf(ssl is None, 'No ssl module') 606 def test_create_ssl_connection(self): 607 with test_utils.run_test_server(use_ssl=True) as httpd: 608 create_connection = functools.partial( 609 self.loop.create_connection, 610 lambda: MyProto(loop=self.loop), 611 *httpd.address) 612 self._test_create_ssl_connection(httpd, create_connection, 613 peername=httpd.address) 614 615 @support.skip_unless_bind_unix_socket 616 @unittest.skipIf(ssl is None, 'No ssl module') 617 def test_create_ssl_unix_connection(self): 618 # Issue #20682: On Mac OS X Tiger, getsockname() returns a 619 # zero-length address for UNIX socket. 620 check_sockname = not broken_unix_getsockname() 621 622 with test_utils.run_test_unix_server(use_ssl=True) as httpd: 623 create_connection = functools.partial( 624 self.loop.create_unix_connection, 625 lambda: MyProto(loop=self.loop), httpd.address, 626 server_hostname='127.0.0.1') 627 628 self._test_create_ssl_connection(httpd, create_connection, 629 check_sockname, 630 peername=httpd.address) 631 632 def test_create_connection_local_addr(self): 633 with test_utils.run_test_server() as httpd: 634 port = support.find_unused_port() 635 f = self.loop.create_connection( 636 lambda: MyProto(loop=self.loop), 637 *httpd.address, local_addr=(httpd.address[0], port)) 638 tr, pr = self.loop.run_until_complete(f) 639 expected = pr.transport.get_extra_info('sockname')[1] 640 self.assertEqual(port, expected) 641 tr.close() 642 643 def test_create_connection_local_addr_in_use(self): 644 with test_utils.run_test_server() as httpd: 645 f = self.loop.create_connection( 646 lambda: MyProto(loop=self.loop), 647 *httpd.address, local_addr=httpd.address) 648 with self.assertRaises(OSError) as cm: 649 self.loop.run_until_complete(f) 650 self.assertEqual(cm.exception.errno, errno.EADDRINUSE) 651 self.assertIn(str(httpd.address), cm.exception.strerror) 652 653 def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None): 654 loop = self.loop 655 656 class MyProto(MyBaseProto): 657 658 def connection_lost(self, exc): 659 super().connection_lost(exc) 660 loop.call_soon(loop.stop) 661 662 def data_received(self, data): 663 super().data_received(data) 664 self.transport.write(expected_response) 665 666 lsock = socket.create_server(('127.0.0.1', 0), backlog=1) 667 addr = lsock.getsockname() 668 669 message = b'test data' 670 response = None 671 expected_response = b'roger' 672 673 def client(): 674 nonlocal response 675 try: 676 csock = socket.socket() 677 if client_ssl is not None: 678 csock = client_ssl.wrap_socket(csock) 679 csock.connect(addr) 680 csock.sendall(message) 681 response = csock.recv(99) 682 csock.close() 683 except Exception as exc: 684 print( 685 "Failure in client thread in test_connect_accepted_socket", 686 exc) 687 688 thread = threading.Thread(target=client, daemon=True) 689 thread.start() 690 691 conn, _ = lsock.accept() 692 proto = MyProto(loop=loop) 693 proto.loop = loop 694 loop.run_until_complete( 695 loop.connect_accepted_socket( 696 (lambda: proto), conn, ssl=server_ssl)) 697 loop.run_forever() 698 proto.transport.close() 699 lsock.close() 700 701 support.join_thread(thread, timeout=1) 702 self.assertFalse(thread.is_alive()) 703 self.assertEqual(proto.state, 'CLOSED') 704 self.assertEqual(proto.nbytes, len(message)) 705 self.assertEqual(response, expected_response) 706 707 @unittest.skipIf(ssl is None, 'No ssl module') 708 def test_ssl_connect_accepted_socket(self): 709 if (sys.platform == 'win32' and 710 sys.version_info < (3, 5) and 711 isinstance(self.loop, proactor_events.BaseProactorEventLoop) 712 ): 713 raise unittest.SkipTest( 714 'SSL not supported with proactor event loops before Python 3.5' 715 ) 716 717 server_context = test_utils.simple_server_sslcontext() 718 client_context = test_utils.simple_client_sslcontext() 719 720 self.test_connect_accepted_socket(server_context, client_context) 721 722 def test_connect_accepted_socket_ssl_timeout_for_plain_socket(self): 723 sock = socket.socket() 724 self.addCleanup(sock.close) 725 coro = self.loop.connect_accepted_socket( 726 MyProto, sock, ssl_handshake_timeout=1) 727 with self.assertRaisesRegex( 728 ValueError, 729 'ssl_handshake_timeout is only meaningful with ssl'): 730 self.loop.run_until_complete(coro) 731 732 @mock.patch('asyncio.base_events.socket') 733 def create_server_multiple_hosts(self, family, hosts, mock_sock): 734 async def getaddrinfo(host, port, *args, **kw): 735 if family == socket.AF_INET: 736 return [(family, socket.SOCK_STREAM, 6, '', (host, port))] 737 else: 738 return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))] 739 740 def getaddrinfo_task(*args, **kwds): 741 return self.loop.create_task(getaddrinfo(*args, **kwds)) 742 743 unique_hosts = set(hosts) 744 745 if family == socket.AF_INET: 746 mock_sock.socket().getsockbyname.side_effect = [ 747 (host, 80) for host in unique_hosts] 748 else: 749 mock_sock.socket().getsockbyname.side_effect = [ 750 (host, 80, 0, 0) for host in unique_hosts] 751 self.loop.getaddrinfo = getaddrinfo_task 752 self.loop._start_serving = mock.Mock() 753 self.loop._stop_serving = mock.Mock() 754 f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80) 755 server = self.loop.run_until_complete(f) 756 self.addCleanup(server.close) 757 server_hosts = {sock.getsockbyname()[0] for sock in server.sockets} 758 self.assertEqual(server_hosts, unique_hosts) 759 760 def test_create_server_multiple_hosts_ipv4(self): 761 self.create_server_multiple_hosts(socket.AF_INET, 762 ['1.2.3.4', '5.6.7.8', '1.2.3.4']) 763 764 def test_create_server_multiple_hosts_ipv6(self): 765 self.create_server_multiple_hosts(socket.AF_INET6, 766 ['::1', '::2', '::1']) 767 768 def test_create_server(self): 769 proto = MyProto(self.loop) 770 f = self.loop.create_server(lambda: proto, '0.0.0.0', 0) 771 server = self.loop.run_until_complete(f) 772 self.assertEqual(len(server.sockets), 1) 773 sock = server.sockets[0] 774 host, port = sock.getsockname() 775 self.assertEqual(host, '0.0.0.0') 776 client = socket.socket() 777 client.connect(('127.0.0.1', port)) 778 client.sendall(b'xxx') 779 780 self.loop.run_until_complete(proto.connected) 781 self.assertEqual('CONNECTED', proto.state) 782 783 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 784 self.assertEqual(3, proto.nbytes) 785 786 # extra info is available 787 self.assertIsNotNone(proto.transport.get_extra_info('sockname')) 788 self.assertEqual('127.0.0.1', 789 proto.transport.get_extra_info('peername')[0]) 790 791 # close connection 792 proto.transport.close() 793 self.loop.run_until_complete(proto.done) 794 795 self.assertEqual('CLOSED', proto.state) 796 797 # the client socket must be closed after to avoid ECONNRESET upon 798 # recv()/send() on the serving socket 799 client.close() 800 801 # close server 802 server.close() 803 804 @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT') 805 def test_create_server_reuse_port(self): 806 proto = MyProto(self.loop) 807 f = self.loop.create_server( 808 lambda: proto, '0.0.0.0', 0) 809 server = self.loop.run_until_complete(f) 810 self.assertEqual(len(server.sockets), 1) 811 sock = server.sockets[0] 812 self.assertFalse( 813 sock.getsockopt( 814 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 815 server.close() 816 817 test_utils.run_briefly(self.loop) 818 819 proto = MyProto(self.loop) 820 f = self.loop.create_server( 821 lambda: proto, '0.0.0.0', 0, reuse_port=True) 822 server = self.loop.run_until_complete(f) 823 self.assertEqual(len(server.sockets), 1) 824 sock = server.sockets[0] 825 self.assertTrue( 826 sock.getsockopt( 827 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 828 server.close() 829 830 def _make_unix_server(self, factory, **kwargs): 831 path = test_utils.gen_unix_socket_path() 832 self.addCleanup(lambda: os.path.exists(path) and os.unlink(path)) 833 834 f = self.loop.create_unix_server(factory, path, **kwargs) 835 server = self.loop.run_until_complete(f) 836 837 return server, path 838 839 @support.skip_unless_bind_unix_socket 840 def test_create_unix_server(self): 841 proto = MyProto(loop=self.loop) 842 server, path = self._make_unix_server(lambda: proto) 843 self.assertEqual(len(server.sockets), 1) 844 845 client = socket.socket(socket.AF_UNIX) 846 client.connect(path) 847 client.sendall(b'xxx') 848 849 self.loop.run_until_complete(proto.connected) 850 self.assertEqual('CONNECTED', proto.state) 851 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 852 self.assertEqual(3, proto.nbytes) 853 854 # close connection 855 proto.transport.close() 856 self.loop.run_until_complete(proto.done) 857 858 self.assertEqual('CLOSED', proto.state) 859 860 # the client socket must be closed after to avoid ECONNRESET upon 861 # recv()/send() on the serving socket 862 client.close() 863 864 # close server 865 server.close() 866 867 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 868 def test_create_unix_server_path_socket_error(self): 869 proto = MyProto(loop=self.loop) 870 sock = socket.socket() 871 with sock: 872 f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock) 873 with self.assertRaisesRegex(ValueError, 874 'path and sock can not be specified ' 875 'at the same time'): 876 self.loop.run_until_complete(f) 877 878 def _create_ssl_context(self, certfile, keyfile=None): 879 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 880 sslcontext.options |= ssl.OP_NO_SSLv2 881 sslcontext.load_cert_chain(certfile, keyfile) 882 return sslcontext 883 884 def _make_ssl_server(self, factory, certfile, keyfile=None): 885 sslcontext = self._create_ssl_context(certfile, keyfile) 886 887 f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext) 888 server = self.loop.run_until_complete(f) 889 890 sock = server.sockets[0] 891 host, port = sock.getsockname() 892 self.assertEqual(host, '127.0.0.1') 893 return server, host, port 894 895 def _make_ssl_unix_server(self, factory, certfile, keyfile=None): 896 sslcontext = self._create_ssl_context(certfile, keyfile) 897 return self._make_unix_server(factory, ssl=sslcontext) 898 899 @unittest.skipIf(ssl is None, 'No ssl module') 900 def test_create_server_ssl(self): 901 proto = MyProto(loop=self.loop) 902 server, host, port = self._make_ssl_server( 903 lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY) 904 905 f_c = self.loop.create_connection(MyBaseProto, host, port, 906 ssl=test_utils.dummy_ssl_context()) 907 client, pr = self.loop.run_until_complete(f_c) 908 909 client.write(b'xxx') 910 self.loop.run_until_complete(proto.connected) 911 self.assertEqual('CONNECTED', proto.state) 912 913 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 914 self.assertEqual(3, proto.nbytes) 915 916 # extra info is available 917 self.check_ssl_extra_info(client, peername=(host, port)) 918 919 # close connection 920 proto.transport.close() 921 self.loop.run_until_complete(proto.done) 922 self.assertEqual('CLOSED', proto.state) 923 924 # the client socket must be closed after to avoid ECONNRESET upon 925 # recv()/send() on the serving socket 926 client.close() 927 928 # stop serving 929 server.close() 930 931 @support.skip_unless_bind_unix_socket 932 @unittest.skipIf(ssl is None, 'No ssl module') 933 def test_create_unix_server_ssl(self): 934 proto = MyProto(loop=self.loop) 935 server, path = self._make_ssl_unix_server( 936 lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY) 937 938 f_c = self.loop.create_unix_connection( 939 MyBaseProto, path, ssl=test_utils.dummy_ssl_context(), 940 server_hostname='') 941 942 client, pr = self.loop.run_until_complete(f_c) 943 944 client.write(b'xxx') 945 self.loop.run_until_complete(proto.connected) 946 self.assertEqual('CONNECTED', proto.state) 947 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 948 self.assertEqual(3, proto.nbytes) 949 950 # close connection 951 proto.transport.close() 952 self.loop.run_until_complete(proto.done) 953 self.assertEqual('CLOSED', proto.state) 954 955 # the client socket must be closed after to avoid ECONNRESET upon 956 # recv()/send() on the serving socket 957 client.close() 958 959 # stop serving 960 server.close() 961 962 @unittest.skipIf(ssl is None, 'No ssl module') 963 def test_create_server_ssl_verify_failed(self): 964 proto = MyProto(loop=self.loop) 965 server, host, port = self._make_ssl_server( 966 lambda: proto, test_utils.SIGNED_CERTFILE) 967 968 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 969 sslcontext_client.options |= ssl.OP_NO_SSLv2 970 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 971 if hasattr(sslcontext_client, 'check_hostname'): 972 sslcontext_client.check_hostname = True 973 974 975 # no CA loaded 976 f_c = self.loop.create_connection(MyProto, host, port, 977 ssl=sslcontext_client) 978 with mock.patch.object(self.loop, 'call_exception_handler'): 979 with test_utils.disable_logger(): 980 with self.assertRaisesRegex(ssl.SSLError, 981 '(?i)certificate.verify.failed'): 982 self.loop.run_until_complete(f_c) 983 984 # execute the loop to log the connection error 985 test_utils.run_briefly(self.loop) 986 987 # close connection 988 self.assertIsNone(proto.transport) 989 server.close() 990 991 @support.skip_unless_bind_unix_socket 992 @unittest.skipIf(ssl is None, 'No ssl module') 993 def test_create_unix_server_ssl_verify_failed(self): 994 proto = MyProto(loop=self.loop) 995 server, path = self._make_ssl_unix_server( 996 lambda: proto, test_utils.SIGNED_CERTFILE) 997 998 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 999 sslcontext_client.options |= ssl.OP_NO_SSLv2 1000 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1001 if hasattr(sslcontext_client, 'check_hostname'): 1002 sslcontext_client.check_hostname = True 1003 1004 # no CA loaded 1005 f_c = self.loop.create_unix_connection(MyProto, path, 1006 ssl=sslcontext_client, 1007 server_hostname='invalid') 1008 with mock.patch.object(self.loop, 'call_exception_handler'): 1009 with test_utils.disable_logger(): 1010 with self.assertRaisesRegex(ssl.SSLError, 1011 '(?i)certificate.verify.failed'): 1012 self.loop.run_until_complete(f_c) 1013 1014 # execute the loop to log the connection error 1015 test_utils.run_briefly(self.loop) 1016 1017 # close connection 1018 self.assertIsNone(proto.transport) 1019 server.close() 1020 1021 @unittest.skipIf(ssl is None, 'No ssl module') 1022 def test_create_server_ssl_match_failed(self): 1023 proto = MyProto(loop=self.loop) 1024 server, host, port = self._make_ssl_server( 1025 lambda: proto, test_utils.SIGNED_CERTFILE) 1026 1027 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1028 sslcontext_client.options |= ssl.OP_NO_SSLv2 1029 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1030 sslcontext_client.load_verify_locations( 1031 cafile=test_utils.SIGNING_CA) 1032 if hasattr(sslcontext_client, 'check_hostname'): 1033 sslcontext_client.check_hostname = True 1034 1035 # incorrect server_hostname 1036 f_c = self.loop.create_connection(MyProto, host, port, 1037 ssl=sslcontext_client) 1038 with mock.patch.object(self.loop, 'call_exception_handler'): 1039 with test_utils.disable_logger(): 1040 with self.assertRaisesRegex( 1041 ssl.CertificateError, 1042 "IP address mismatch, certificate is not valid for " 1043 "'127.0.0.1'"): 1044 self.loop.run_until_complete(f_c) 1045 1046 # close connection 1047 # transport is None because TLS ALERT aborted the handshake 1048 self.assertIsNone(proto.transport) 1049 server.close() 1050 1051 @support.skip_unless_bind_unix_socket 1052 @unittest.skipIf(ssl is None, 'No ssl module') 1053 def test_create_unix_server_ssl_verified(self): 1054 proto = MyProto(loop=self.loop) 1055 server, path = self._make_ssl_unix_server( 1056 lambda: proto, test_utils.SIGNED_CERTFILE) 1057 1058 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1059 sslcontext_client.options |= ssl.OP_NO_SSLv2 1060 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1061 sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA) 1062 if hasattr(sslcontext_client, 'check_hostname'): 1063 sslcontext_client.check_hostname = True 1064 1065 # Connection succeeds with correct CA and server hostname. 1066 f_c = self.loop.create_unix_connection(MyProto, path, 1067 ssl=sslcontext_client, 1068 server_hostname='localhost') 1069 client, pr = self.loop.run_until_complete(f_c) 1070 1071 # close connection 1072 proto.transport.close() 1073 client.close() 1074 server.close() 1075 self.loop.run_until_complete(proto.done) 1076 1077 @unittest.skipIf(ssl is None, 'No ssl module') 1078 def test_create_server_ssl_verified(self): 1079 proto = MyProto(loop=self.loop) 1080 server, host, port = self._make_ssl_server( 1081 lambda: proto, test_utils.SIGNED_CERTFILE) 1082 1083 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1084 sslcontext_client.options |= ssl.OP_NO_SSLv2 1085 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1086 sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA) 1087 if hasattr(sslcontext_client, 'check_hostname'): 1088 sslcontext_client.check_hostname = True 1089 1090 # Connection succeeds with correct CA and server hostname. 1091 f_c = self.loop.create_connection(MyProto, host, port, 1092 ssl=sslcontext_client, 1093 server_hostname='localhost') 1094 client, pr = self.loop.run_until_complete(f_c) 1095 1096 # extra info is available 1097 self.check_ssl_extra_info(client, peername=(host, port), 1098 peercert=test_utils.PEERCERT) 1099 1100 # close connection 1101 proto.transport.close() 1102 client.close() 1103 server.close() 1104 self.loop.run_until_complete(proto.done) 1105 1106 def test_create_server_sock(self): 1107 proto = self.loop.create_future() 1108 1109 class TestMyProto(MyProto): 1110 def connection_made(self, transport): 1111 super().connection_made(transport) 1112 proto.set_result(self) 1113 1114 sock_ob = socket.create_server(('0.0.0.0', 0)) 1115 1116 f = self.loop.create_server(TestMyProto, sock=sock_ob) 1117 server = self.loop.run_until_complete(f) 1118 sock = server.sockets[0] 1119 self.assertEqual(sock.fileno(), sock_ob.fileno()) 1120 1121 host, port = sock.getsockname() 1122 self.assertEqual(host, '0.0.0.0') 1123 client = socket.socket() 1124 client.connect(('127.0.0.1', port)) 1125 client.send(b'xxx') 1126 client.close() 1127 server.close() 1128 1129 def test_create_server_addr_in_use(self): 1130 sock_ob = socket.create_server(('0.0.0.0', 0)) 1131 1132 f = self.loop.create_server(MyProto, sock=sock_ob) 1133 server = self.loop.run_until_complete(f) 1134 sock = server.sockets[0] 1135 host, port = sock.getsockname() 1136 1137 f = self.loop.create_server(MyProto, host=host, port=port) 1138 with self.assertRaises(OSError) as cm: 1139 self.loop.run_until_complete(f) 1140 self.assertEqual(cm.exception.errno, errno.EADDRINUSE) 1141 1142 server.close() 1143 1144 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled') 1145 def test_create_server_dual_stack(self): 1146 f_proto = self.loop.create_future() 1147 1148 class TestMyProto(MyProto): 1149 def connection_made(self, transport): 1150 super().connection_made(transport) 1151 f_proto.set_result(self) 1152 1153 try_count = 0 1154 while True: 1155 try: 1156 port = support.find_unused_port() 1157 f = self.loop.create_server(TestMyProto, host=None, port=port) 1158 server = self.loop.run_until_complete(f) 1159 except OSError as ex: 1160 if ex.errno == errno.EADDRINUSE: 1161 try_count += 1 1162 self.assertGreaterEqual(5, try_count) 1163 continue 1164 else: 1165 raise 1166 else: 1167 break 1168 client = socket.socket() 1169 client.connect(('127.0.0.1', port)) 1170 client.send(b'xxx') 1171 proto = self.loop.run_until_complete(f_proto) 1172 proto.transport.close() 1173 client.close() 1174 1175 f_proto = self.loop.create_future() 1176 client = socket.socket(socket.AF_INET6) 1177 client.connect(('::1', port)) 1178 client.send(b'xxx') 1179 proto = self.loop.run_until_complete(f_proto) 1180 proto.transport.close() 1181 client.close() 1182 1183 server.close() 1184 1185 def test_server_close(self): 1186 f = self.loop.create_server(MyProto, '0.0.0.0', 0) 1187 server = self.loop.run_until_complete(f) 1188 sock = server.sockets[0] 1189 host, port = sock.getsockname() 1190 1191 client = socket.socket() 1192 client.connect(('127.0.0.1', port)) 1193 client.send(b'xxx') 1194 client.close() 1195 1196 server.close() 1197 1198 client = socket.socket() 1199 self.assertRaises( 1200 ConnectionRefusedError, client.connect, ('127.0.0.1', port)) 1201 client.close() 1202 1203 def test_create_datagram_endpoint(self): 1204 class TestMyDatagramProto(MyDatagramProto): 1205 def __init__(inner_self): 1206 super().__init__(loop=self.loop) 1207 1208 def datagram_received(self, data, addr): 1209 super().datagram_received(data, addr) 1210 self.transport.sendto(b'resp:'+data, addr) 1211 1212 coro = self.loop.create_datagram_endpoint( 1213 TestMyDatagramProto, local_addr=('127.0.0.1', 0)) 1214 s_transport, server = self.loop.run_until_complete(coro) 1215 host, port = s_transport.get_extra_info('sockname') 1216 1217 self.assertIsInstance(s_transport, asyncio.Transport) 1218 self.assertIsInstance(server, TestMyDatagramProto) 1219 self.assertEqual('INITIALIZED', server.state) 1220 self.assertIs(server.transport, s_transport) 1221 1222 coro = self.loop.create_datagram_endpoint( 1223 lambda: MyDatagramProto(loop=self.loop), 1224 remote_addr=(host, port)) 1225 transport, client = self.loop.run_until_complete(coro) 1226 1227 self.assertIsInstance(transport, asyncio.Transport) 1228 self.assertIsInstance(client, MyDatagramProto) 1229 self.assertEqual('INITIALIZED', client.state) 1230 self.assertIs(client.transport, transport) 1231 1232 transport.sendto(b'xxx') 1233 test_utils.run_until(self.loop, lambda: server.nbytes) 1234 self.assertEqual(3, server.nbytes) 1235 test_utils.run_until(self.loop, lambda: client.nbytes) 1236 1237 # received 1238 self.assertEqual(8, client.nbytes) 1239 1240 # extra info is available 1241 self.assertIsNotNone(transport.get_extra_info('sockname')) 1242 1243 # close connection 1244 transport.close() 1245 self.loop.run_until_complete(client.done) 1246 self.assertEqual('CLOSED', client.state) 1247 server.transport.close() 1248 1249 def test_create_datagram_endpoint_sock(self): 1250 sock = None 1251 local_address = ('127.0.0.1', 0) 1252 infos = self.loop.run_until_complete( 1253 self.loop.getaddrinfo( 1254 *local_address, type=socket.SOCK_DGRAM)) 1255 for family, type, proto, cname, address in infos: 1256 try: 1257 sock = socket.socket(family=family, type=type, proto=proto) 1258 sock.setblocking(False) 1259 sock.bind(address) 1260 except: 1261 pass 1262 else: 1263 break 1264 else: 1265 assert False, 'Can not create socket.' 1266 1267 f = self.loop.create_datagram_endpoint( 1268 lambda: MyDatagramProto(loop=self.loop), sock=sock) 1269 tr, pr = self.loop.run_until_complete(f) 1270 self.assertIsInstance(tr, asyncio.Transport) 1271 self.assertIsInstance(pr, MyDatagramProto) 1272 tr.close() 1273 self.loop.run_until_complete(pr.done) 1274 1275 def test_internal_fds(self): 1276 loop = self.create_event_loop() 1277 if not isinstance(loop, selector_events.BaseSelectorEventLoop): 1278 loop.close() 1279 self.skipTest('loop is not a BaseSelectorEventLoop') 1280 1281 self.assertEqual(1, loop._internal_fds) 1282 loop.close() 1283 self.assertEqual(0, loop._internal_fds) 1284 self.assertIsNone(loop._csock) 1285 self.assertIsNone(loop._ssock) 1286 1287 @unittest.skipUnless(sys.platform != 'win32', 1288 "Don't support pipes for Windows") 1289 def test_read_pipe(self): 1290 proto = MyReadPipeProto(loop=self.loop) 1291 1292 rpipe, wpipe = os.pipe() 1293 pipeobj = io.open(rpipe, 'rb', 1024) 1294 1295 async def connect(): 1296 t, p = await self.loop.connect_read_pipe( 1297 lambda: proto, pipeobj) 1298 self.assertIs(p, proto) 1299 self.assertIs(t, proto.transport) 1300 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1301 self.assertEqual(0, proto.nbytes) 1302 1303 self.loop.run_until_complete(connect()) 1304 1305 os.write(wpipe, b'1') 1306 test_utils.run_until(self.loop, lambda: proto.nbytes >= 1) 1307 self.assertEqual(1, proto.nbytes) 1308 1309 os.write(wpipe, b'2345') 1310 test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) 1311 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1312 self.assertEqual(5, proto.nbytes) 1313 1314 os.close(wpipe) 1315 self.loop.run_until_complete(proto.done) 1316 self.assertEqual( 1317 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) 1318 # extra info is available 1319 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1320 1321 @unittest.skipUnless(sys.platform != 'win32', 1322 "Don't support pipes for Windows") 1323 def test_unclosed_pipe_transport(self): 1324 # This test reproduces the issue #314 on GitHub 1325 loop = self.create_event_loop() 1326 read_proto = MyReadPipeProto(loop=loop) 1327 write_proto = MyWritePipeProto(loop=loop) 1328 1329 rpipe, wpipe = os.pipe() 1330 rpipeobj = io.open(rpipe, 'rb', 1024) 1331 wpipeobj = io.open(wpipe, 'w', 1024) 1332 1333 async def connect(): 1334 read_transport, _ = await loop.connect_read_pipe( 1335 lambda: read_proto, rpipeobj) 1336 write_transport, _ = await loop.connect_write_pipe( 1337 lambda: write_proto, wpipeobj) 1338 return read_transport, write_transport 1339 1340 # Run and close the loop without closing the transports 1341 read_transport, write_transport = loop.run_until_complete(connect()) 1342 loop.close() 1343 1344 # These 'repr' calls used to raise an AttributeError 1345 # See Issue #314 on GitHub 1346 self.assertIn('open', repr(read_transport)) 1347 self.assertIn('open', repr(write_transport)) 1348 1349 # Clean up (avoid ResourceWarning) 1350 rpipeobj.close() 1351 wpipeobj.close() 1352 read_transport._pipe = None 1353 write_transport._pipe = None 1354 1355 @unittest.skipUnless(sys.platform != 'win32', 1356 "Don't support pipes for Windows") 1357 def test_read_pty_output(self): 1358 proto = MyReadPipeProto(loop=self.loop) 1359 1360 master, slave = os.openpty() 1361 master_read_obj = io.open(master, 'rb', 0) 1362 1363 async def connect(): 1364 t, p = await self.loop.connect_read_pipe(lambda: proto, 1365 master_read_obj) 1366 self.assertIs(p, proto) 1367 self.assertIs(t, proto.transport) 1368 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1369 self.assertEqual(0, proto.nbytes) 1370 1371 self.loop.run_until_complete(connect()) 1372 1373 os.write(slave, b'1') 1374 test_utils.run_until(self.loop, lambda: proto.nbytes) 1375 self.assertEqual(1, proto.nbytes) 1376 1377 os.write(slave, b'2345') 1378 test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) 1379 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1380 self.assertEqual(5, proto.nbytes) 1381 1382 os.close(slave) 1383 proto.transport.close() 1384 self.loop.run_until_complete(proto.done) 1385 self.assertEqual( 1386 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) 1387 # extra info is available 1388 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1389 1390 @unittest.skipUnless(sys.platform != 'win32', 1391 "Don't support pipes for Windows") 1392 def test_write_pipe(self): 1393 rpipe, wpipe = os.pipe() 1394 pipeobj = io.open(wpipe, 'wb', 1024) 1395 1396 proto = MyWritePipeProto(loop=self.loop) 1397 connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) 1398 transport, p = self.loop.run_until_complete(connect) 1399 self.assertIs(p, proto) 1400 self.assertIs(transport, proto.transport) 1401 self.assertEqual('CONNECTED', proto.state) 1402 1403 transport.write(b'1') 1404 1405 data = bytearray() 1406 def reader(data): 1407 chunk = os.read(rpipe, 1024) 1408 data += chunk 1409 return len(data) 1410 1411 test_utils.run_until(self.loop, lambda: reader(data) >= 1) 1412 self.assertEqual(b'1', data) 1413 1414 transport.write(b'2345') 1415 test_utils.run_until(self.loop, lambda: reader(data) >= 5) 1416 self.assertEqual(b'12345', data) 1417 self.assertEqual('CONNECTED', proto.state) 1418 1419 os.close(rpipe) 1420 1421 # extra info is available 1422 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1423 1424 # close connection 1425 proto.transport.close() 1426 self.loop.run_until_complete(proto.done) 1427 self.assertEqual('CLOSED', proto.state) 1428 1429 @unittest.skipUnless(sys.platform != 'win32', 1430 "Don't support pipes for Windows") 1431 def test_write_pipe_disconnect_on_close(self): 1432 rsock, wsock = socket.socketpair() 1433 rsock.setblocking(False) 1434 pipeobj = io.open(wsock.detach(), 'wb', 1024) 1435 1436 proto = MyWritePipeProto(loop=self.loop) 1437 connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) 1438 transport, p = self.loop.run_until_complete(connect) 1439 self.assertIs(p, proto) 1440 self.assertIs(transport, proto.transport) 1441 self.assertEqual('CONNECTED', proto.state) 1442 1443 transport.write(b'1') 1444 data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024)) 1445 self.assertEqual(b'1', data) 1446 1447 rsock.close() 1448 1449 self.loop.run_until_complete(proto.done) 1450 self.assertEqual('CLOSED', proto.state) 1451 1452 @unittest.skipUnless(sys.platform != 'win32', 1453 "Don't support pipes for Windows") 1454 # select, poll and kqueue don't support character devices (PTY) on Mac OS X 1455 # older than 10.6 (Snow Leopard) 1456 @support.requires_mac_ver(10, 6) 1457 def test_write_pty(self): 1458 master, slave = os.openpty() 1459 slave_write_obj = io.open(slave, 'wb', 0) 1460 1461 proto = MyWritePipeProto(loop=self.loop) 1462 connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj) 1463 transport, p = self.loop.run_until_complete(connect) 1464 self.assertIs(p, proto) 1465 self.assertIs(transport, proto.transport) 1466 self.assertEqual('CONNECTED', proto.state) 1467 1468 transport.write(b'1') 1469 1470 data = bytearray() 1471 def reader(data): 1472 chunk = os.read(master, 1024) 1473 data += chunk 1474 return len(data) 1475 1476 test_utils.run_until(self.loop, lambda: reader(data) >= 1, 1477 timeout=10) 1478 self.assertEqual(b'1', data) 1479 1480 transport.write(b'2345') 1481 test_utils.run_until(self.loop, lambda: reader(data) >= 5, 1482 timeout=10) 1483 self.assertEqual(b'12345', data) 1484 self.assertEqual('CONNECTED', proto.state) 1485 1486 os.close(master) 1487 1488 # extra info is available 1489 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1490 1491 # close connection 1492 proto.transport.close() 1493 self.loop.run_until_complete(proto.done) 1494 self.assertEqual('CLOSED', proto.state) 1495 1496 @unittest.skipUnless(sys.platform != 'win32', 1497 "Don't support pipes for Windows") 1498 # select, poll and kqueue don't support character devices (PTY) on Mac OS X 1499 # older than 10.6 (Snow Leopard) 1500 @support.requires_mac_ver(10, 6) 1501 def test_bidirectional_pty(self): 1502 master, read_slave = os.openpty() 1503 write_slave = os.dup(read_slave) 1504 tty.setraw(read_slave) 1505 1506 slave_read_obj = io.open(read_slave, 'rb', 0) 1507 read_proto = MyReadPipeProto(loop=self.loop) 1508 read_connect = self.loop.connect_read_pipe(lambda: read_proto, 1509 slave_read_obj) 1510 read_transport, p = self.loop.run_until_complete(read_connect) 1511 self.assertIs(p, read_proto) 1512 self.assertIs(read_transport, read_proto.transport) 1513 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1514 self.assertEqual(0, read_proto.nbytes) 1515 1516 1517 slave_write_obj = io.open(write_slave, 'wb', 0) 1518 write_proto = MyWritePipeProto(loop=self.loop) 1519 write_connect = self.loop.connect_write_pipe(lambda: write_proto, 1520 slave_write_obj) 1521 write_transport, p = self.loop.run_until_complete(write_connect) 1522 self.assertIs(p, write_proto) 1523 self.assertIs(write_transport, write_proto.transport) 1524 self.assertEqual('CONNECTED', write_proto.state) 1525 1526 data = bytearray() 1527 def reader(data): 1528 chunk = os.read(master, 1024) 1529 data += chunk 1530 return len(data) 1531 1532 write_transport.write(b'1') 1533 test_utils.run_until(self.loop, lambda: reader(data) >= 1, timeout=10) 1534 self.assertEqual(b'1', data) 1535 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1536 self.assertEqual('CONNECTED', write_proto.state) 1537 1538 os.write(master, b'a') 1539 test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1, 1540 timeout=10) 1541 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1542 self.assertEqual(1, read_proto.nbytes) 1543 self.assertEqual('CONNECTED', write_proto.state) 1544 1545 write_transport.write(b'2345') 1546 test_utils.run_until(self.loop, lambda: reader(data) >= 5, timeout=10) 1547 self.assertEqual(b'12345', data) 1548 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1549 self.assertEqual('CONNECTED', write_proto.state) 1550 1551 os.write(master, b'bcde') 1552 test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5, 1553 timeout=10) 1554 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1555 self.assertEqual(5, read_proto.nbytes) 1556 self.assertEqual('CONNECTED', write_proto.state) 1557 1558 os.close(master) 1559 1560 read_transport.close() 1561 self.loop.run_until_complete(read_proto.done) 1562 self.assertEqual( 1563 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state) 1564 1565 write_transport.close() 1566 self.loop.run_until_complete(write_proto.done) 1567 self.assertEqual('CLOSED', write_proto.state) 1568 1569 def test_prompt_cancellation(self): 1570 r, w = socket.socketpair() 1571 r.setblocking(False) 1572 f = self.loop.create_task(self.loop.sock_recv(r, 1)) 1573 ov = getattr(f, 'ov', None) 1574 if ov is not None: 1575 self.assertTrue(ov.pending) 1576 1577 async def main(): 1578 try: 1579 self.loop.call_soon(f.cancel) 1580 await f 1581 except asyncio.CancelledError: 1582 res = 'cancelled' 1583 else: 1584 res = None 1585 finally: 1586 self.loop.stop() 1587 return res 1588 1589 start = time.monotonic() 1590 t = self.loop.create_task(main()) 1591 self.loop.run_forever() 1592 elapsed = time.monotonic() - start 1593 1594 self.assertLess(elapsed, 0.1) 1595 self.assertEqual(t.result(), 'cancelled') 1596 self.assertRaises(asyncio.CancelledError, f.result) 1597 if ov is not None: 1598 self.assertFalse(ov.pending) 1599 self.loop._stop_serving(r) 1600 1601 r.close() 1602 w.close() 1603 1604 def test_timeout_rounding(self): 1605 def _run_once(): 1606 self.loop._run_once_counter += 1 1607 orig_run_once() 1608 1609 orig_run_once = self.loop._run_once 1610 self.loop._run_once_counter = 0 1611 self.loop._run_once = _run_once 1612 1613 async def wait(): 1614 loop = self.loop 1615 await asyncio.sleep(1e-2) 1616 await asyncio.sleep(1e-4) 1617 await asyncio.sleep(1e-6) 1618 await asyncio.sleep(1e-8) 1619 await asyncio.sleep(1e-10) 1620 1621 self.loop.run_until_complete(wait()) 1622 # The ideal number of call is 12, but on some platforms, the selector 1623 # may sleep at little bit less than timeout depending on the resolution 1624 # of the clock used by the kernel. Tolerate a few useless calls on 1625 # these platforms. 1626 self.assertLessEqual(self.loop._run_once_counter, 20, 1627 {'clock_resolution': self.loop._clock_resolution, 1628 'selector': self.loop._selector.__class__.__name__}) 1629 1630 def test_remove_fds_after_closing(self): 1631 loop = self.create_event_loop() 1632 callback = lambda: None 1633 r, w = socket.socketpair() 1634 self.addCleanup(r.close) 1635 self.addCleanup(w.close) 1636 loop.add_reader(r, callback) 1637 loop.add_writer(w, callback) 1638 loop.close() 1639 self.assertFalse(loop.remove_reader(r)) 1640 self.assertFalse(loop.remove_writer(w)) 1641 1642 def test_add_fds_after_closing(self): 1643 loop = self.create_event_loop() 1644 callback = lambda: None 1645 r, w = socket.socketpair() 1646 self.addCleanup(r.close) 1647 self.addCleanup(w.close) 1648 loop.close() 1649 with self.assertRaises(RuntimeError): 1650 loop.add_reader(r, callback) 1651 with self.assertRaises(RuntimeError): 1652 loop.add_writer(w, callback) 1653 1654 def test_close_running_event_loop(self): 1655 async def close_loop(loop): 1656 self.loop.close() 1657 1658 coro = close_loop(self.loop) 1659 with self.assertRaises(RuntimeError): 1660 self.loop.run_until_complete(coro) 1661 1662 def test_close(self): 1663 self.loop.close() 1664 1665 async def test(): 1666 pass 1667 1668 func = lambda: False 1669 coro = test() 1670 self.addCleanup(coro.close) 1671 1672 # operation blocked when the loop is closed 1673 with self.assertRaises(RuntimeError): 1674 self.loop.run_forever() 1675 with self.assertRaises(RuntimeError): 1676 fut = self.loop.create_future() 1677 self.loop.run_until_complete(fut) 1678 with self.assertRaises(RuntimeError): 1679 self.loop.call_soon(func) 1680 with self.assertRaises(RuntimeError): 1681 self.loop.call_soon_threadsafe(func) 1682 with self.assertRaises(RuntimeError): 1683 self.loop.call_later(1.0, func) 1684 with self.assertRaises(RuntimeError): 1685 self.loop.call_at(self.loop.time() + .0, func) 1686 with self.assertRaises(RuntimeError): 1687 self.loop.create_task(coro) 1688 with self.assertRaises(RuntimeError): 1689 self.loop.add_signal_handler(signal.SIGTERM, func) 1690 1691 # run_in_executor test is tricky: the method is a coroutine, 1692 # but run_until_complete cannot be called on closed loop. 1693 # Thus iterate once explicitly. 1694 with self.assertRaises(RuntimeError): 1695 it = self.loop.run_in_executor(None, func).__await__() 1696 next(it) 1697 1698 1699class SubprocessTestsMixin: 1700 1701 def check_terminated(self, returncode): 1702 if sys.platform == 'win32': 1703 self.assertIsInstance(returncode, int) 1704 # expect 1 but sometimes get 0 1705 else: 1706 self.assertEqual(-signal.SIGTERM, returncode) 1707 1708 def check_killed(self, returncode): 1709 if sys.platform == 'win32': 1710 self.assertIsInstance(returncode, int) 1711 # expect 1 but sometimes get 0 1712 else: 1713 self.assertEqual(-signal.SIGKILL, returncode) 1714 1715 def test_subprocess_exec(self): 1716 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1717 1718 connect = self.loop.subprocess_exec( 1719 functools.partial(MySubprocessProtocol, self.loop), 1720 sys.executable, prog) 1721 with self.assertWarns(DeprecationWarning): 1722 transp, proto = self.loop.run_until_complete(connect) 1723 self.assertIsInstance(proto, MySubprocessProtocol) 1724 self.loop.run_until_complete(proto.connected) 1725 self.assertEqual('CONNECTED', proto.state) 1726 1727 stdin = transp.get_pipe_transport(0) 1728 stdin.write(b'Python The Winner') 1729 self.loop.run_until_complete(proto.got_data[1].wait()) 1730 with test_utils.disable_logger(): 1731 transp.close() 1732 self.loop.run_until_complete(proto.completed) 1733 self.check_killed(proto.returncode) 1734 self.assertEqual(b'Python The Winner', proto.data[1]) 1735 1736 def test_subprocess_interactive(self): 1737 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1738 1739 connect = self.loop.subprocess_exec( 1740 functools.partial(MySubprocessProtocol, self.loop), 1741 sys.executable, prog) 1742 1743 with self.assertWarns(DeprecationWarning): 1744 transp, proto = self.loop.run_until_complete(connect) 1745 self.assertIsInstance(proto, MySubprocessProtocol) 1746 self.loop.run_until_complete(proto.connected) 1747 self.assertEqual('CONNECTED', proto.state) 1748 1749 stdin = transp.get_pipe_transport(0) 1750 stdin.write(b'Python ') 1751 self.loop.run_until_complete(proto.got_data[1].wait()) 1752 proto.got_data[1].clear() 1753 self.assertEqual(b'Python ', proto.data[1]) 1754 1755 stdin.write(b'The Winner') 1756 self.loop.run_until_complete(proto.got_data[1].wait()) 1757 self.assertEqual(b'Python The Winner', proto.data[1]) 1758 1759 with test_utils.disable_logger(): 1760 transp.close() 1761 self.loop.run_until_complete(proto.completed) 1762 self.check_killed(proto.returncode) 1763 1764 def test_subprocess_shell(self): 1765 with self.assertWarns(DeprecationWarning): 1766 connect = self.loop.subprocess_shell( 1767 functools.partial(MySubprocessProtocol, self.loop), 1768 'echo Python') 1769 transp, proto = self.loop.run_until_complete(connect) 1770 self.assertIsInstance(proto, MySubprocessProtocol) 1771 self.loop.run_until_complete(proto.connected) 1772 1773 transp.get_pipe_transport(0).close() 1774 self.loop.run_until_complete(proto.completed) 1775 self.assertEqual(0, proto.returncode) 1776 self.assertTrue(all(f.done() for f in proto.disconnects.values())) 1777 self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python') 1778 self.assertEqual(proto.data[2], b'') 1779 transp.close() 1780 1781 def test_subprocess_exitcode(self): 1782 connect = self.loop.subprocess_shell( 1783 functools.partial(MySubprocessProtocol, self.loop), 1784 'exit 7', stdin=None, stdout=None, stderr=None) 1785 1786 with self.assertWarns(DeprecationWarning): 1787 transp, proto = self.loop.run_until_complete(connect) 1788 self.assertIsInstance(proto, MySubprocessProtocol) 1789 self.loop.run_until_complete(proto.completed) 1790 self.assertEqual(7, proto.returncode) 1791 transp.close() 1792 1793 def test_subprocess_close_after_finish(self): 1794 connect = self.loop.subprocess_shell( 1795 functools.partial(MySubprocessProtocol, self.loop), 1796 'exit 7', stdin=None, stdout=None, stderr=None) 1797 with self.assertWarns(DeprecationWarning): 1798 transp, proto = self.loop.run_until_complete(connect) 1799 self.assertIsInstance(proto, MySubprocessProtocol) 1800 self.assertIsNone(transp.get_pipe_transport(0)) 1801 self.assertIsNone(transp.get_pipe_transport(1)) 1802 self.assertIsNone(transp.get_pipe_transport(2)) 1803 self.loop.run_until_complete(proto.completed) 1804 self.assertEqual(7, proto.returncode) 1805 self.assertIsNone(transp.close()) 1806 1807 def test_subprocess_kill(self): 1808 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1809 1810 connect = self.loop.subprocess_exec( 1811 functools.partial(MySubprocessProtocol, self.loop), 1812 sys.executable, prog) 1813 1814 with self.assertWarns(DeprecationWarning): 1815 transp, proto = self.loop.run_until_complete(connect) 1816 self.assertIsInstance(proto, MySubprocessProtocol) 1817 self.loop.run_until_complete(proto.connected) 1818 1819 transp.kill() 1820 self.loop.run_until_complete(proto.completed) 1821 self.check_killed(proto.returncode) 1822 transp.close() 1823 1824 def test_subprocess_terminate(self): 1825 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1826 1827 connect = self.loop.subprocess_exec( 1828 functools.partial(MySubprocessProtocol, self.loop), 1829 sys.executable, prog) 1830 1831 with self.assertWarns(DeprecationWarning): 1832 transp, proto = self.loop.run_until_complete(connect) 1833 self.assertIsInstance(proto, MySubprocessProtocol) 1834 self.loop.run_until_complete(proto.connected) 1835 1836 transp.terminate() 1837 self.loop.run_until_complete(proto.completed) 1838 self.check_terminated(proto.returncode) 1839 transp.close() 1840 1841 @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") 1842 def test_subprocess_send_signal(self): 1843 # bpo-31034: Make sure that we get the default signal handler (killing 1844 # the process). The parent process may have decided to ignore SIGHUP, 1845 # and signal handlers are inherited. 1846 old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL) 1847 try: 1848 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1849 1850 connect = self.loop.subprocess_exec( 1851 functools.partial(MySubprocessProtocol, self.loop), 1852 sys.executable, prog) 1853 1854 with self.assertWarns(DeprecationWarning): 1855 transp, proto = self.loop.run_until_complete(connect) 1856 self.assertIsInstance(proto, MySubprocessProtocol) 1857 self.loop.run_until_complete(proto.connected) 1858 1859 transp.send_signal(signal.SIGHUP) 1860 self.loop.run_until_complete(proto.completed) 1861 self.assertEqual(-signal.SIGHUP, proto.returncode) 1862 transp.close() 1863 finally: 1864 signal.signal(signal.SIGHUP, old_handler) 1865 1866 def test_subprocess_stderr(self): 1867 prog = os.path.join(os.path.dirname(__file__), 'echo2.py') 1868 1869 connect = self.loop.subprocess_exec( 1870 functools.partial(MySubprocessProtocol, self.loop), 1871 sys.executable, prog) 1872 1873 with self.assertWarns(DeprecationWarning): 1874 transp, proto = self.loop.run_until_complete(connect) 1875 self.assertIsInstance(proto, MySubprocessProtocol) 1876 self.loop.run_until_complete(proto.connected) 1877 1878 stdin = transp.get_pipe_transport(0) 1879 stdin.write(b'test') 1880 1881 self.loop.run_until_complete(proto.completed) 1882 1883 transp.close() 1884 self.assertEqual(b'OUT:test', proto.data[1]) 1885 self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2]) 1886 self.assertEqual(0, proto.returncode) 1887 1888 def test_subprocess_stderr_redirect_to_stdout(self): 1889 prog = os.path.join(os.path.dirname(__file__), 'echo2.py') 1890 1891 connect = self.loop.subprocess_exec( 1892 functools.partial(MySubprocessProtocol, self.loop), 1893 sys.executable, prog, stderr=subprocess.STDOUT) 1894 1895 with self.assertWarns(DeprecationWarning): 1896 transp, proto = self.loop.run_until_complete(connect) 1897 self.assertIsInstance(proto, MySubprocessProtocol) 1898 self.loop.run_until_complete(proto.connected) 1899 1900 stdin = transp.get_pipe_transport(0) 1901 self.assertIsNotNone(transp.get_pipe_transport(1)) 1902 self.assertIsNone(transp.get_pipe_transport(2)) 1903 1904 stdin.write(b'test') 1905 self.loop.run_until_complete(proto.completed) 1906 self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'), 1907 proto.data[1]) 1908 self.assertEqual(b'', proto.data[2]) 1909 1910 transp.close() 1911 self.assertEqual(0, proto.returncode) 1912 1913 def test_subprocess_close_client_stream(self): 1914 prog = os.path.join(os.path.dirname(__file__), 'echo3.py') 1915 1916 connect = self.loop.subprocess_exec( 1917 functools.partial(MySubprocessProtocol, self.loop), 1918 sys.executable, prog) 1919 with self.assertWarns(DeprecationWarning): 1920 transp, proto = self.loop.run_until_complete(connect) 1921 self.assertIsInstance(proto, MySubprocessProtocol) 1922 self.loop.run_until_complete(proto.connected) 1923 1924 stdin = transp.get_pipe_transport(0) 1925 stdout = transp.get_pipe_transport(1) 1926 stdin.write(b'test') 1927 self.loop.run_until_complete(proto.got_data[1].wait()) 1928 self.assertEqual(b'OUT:test', proto.data[1]) 1929 1930 stdout.close() 1931 self.loop.run_until_complete(proto.disconnects[1]) 1932 stdin.write(b'xxx') 1933 self.loop.run_until_complete(proto.got_data[2].wait()) 1934 if sys.platform != 'win32': 1935 self.assertEqual(b'ERR:BrokenPipeError', proto.data[2]) 1936 else: 1937 # After closing the read-end of a pipe, writing to the 1938 # write-end using os.write() fails with errno==EINVAL and 1939 # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using 1940 # WriteFile() we get ERROR_BROKEN_PIPE as expected.) 1941 self.assertEqual(b'ERR:OSError', proto.data[2]) 1942 with test_utils.disable_logger(): 1943 transp.close() 1944 self.loop.run_until_complete(proto.completed) 1945 self.check_killed(proto.returncode) 1946 1947 def test_subprocess_wait_no_same_group(self): 1948 # start the new process in a new session 1949 connect = self.loop.subprocess_shell( 1950 functools.partial(MySubprocessProtocol, self.loop), 1951 'exit 7', stdin=None, stdout=None, stderr=None, 1952 start_new_session=True) 1953 _, proto = yield self.loop.run_until_complete(connect) 1954 self.assertIsInstance(proto, MySubprocessProtocol) 1955 self.loop.run_until_complete(proto.completed) 1956 self.assertEqual(7, proto.returncode) 1957 1958 def test_subprocess_exec_invalid_args(self): 1959 async def connect(**kwds): 1960 await self.loop.subprocess_exec( 1961 asyncio.SubprocessProtocol, 1962 'pwd', **kwds) 1963 1964 with self.assertRaises(ValueError): 1965 self.loop.run_until_complete(connect(universal_newlines=True)) 1966 with self.assertRaises(ValueError): 1967 self.loop.run_until_complete(connect(bufsize=4096)) 1968 with self.assertRaises(ValueError): 1969 self.loop.run_until_complete(connect(shell=True)) 1970 1971 def test_subprocess_shell_invalid_args(self): 1972 1973 async def connect(cmd=None, **kwds): 1974 if not cmd: 1975 cmd = 'pwd' 1976 await self.loop.subprocess_shell( 1977 asyncio.SubprocessProtocol, 1978 cmd, **kwds) 1979 1980 with self.assertRaises(ValueError): 1981 self.loop.run_until_complete(connect(['ls', '-l'])) 1982 with self.assertRaises(ValueError): 1983 self.loop.run_until_complete(connect(universal_newlines=True)) 1984 with self.assertRaises(ValueError): 1985 self.loop.run_until_complete(connect(bufsize=4096)) 1986 with self.assertRaises(ValueError): 1987 self.loop.run_until_complete(connect(shell=False)) 1988 1989 1990if sys.platform == 'win32': 1991 1992 class SelectEventLoopTests(EventLoopTestsMixin, 1993 test_utils.TestCase): 1994 1995 def create_event_loop(self): 1996 return asyncio.SelectorEventLoop() 1997 1998 class ProactorEventLoopTests(EventLoopTestsMixin, 1999 SubprocessTestsMixin, 2000 test_utils.TestCase): 2001 2002 def create_event_loop(self): 2003 return asyncio.ProactorEventLoop() 2004 2005 def test_reader_callback(self): 2006 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2007 2008 def test_reader_callback_cancel(self): 2009 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2010 2011 def test_writer_callback(self): 2012 raise unittest.SkipTest("IocpEventLoop does not have add_writer()") 2013 2014 def test_writer_callback_cancel(self): 2015 raise unittest.SkipTest("IocpEventLoop does not have add_writer()") 2016 2017 def test_remove_fds_after_closing(self): 2018 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2019else: 2020 import selectors 2021 2022 class UnixEventLoopTestsMixin(EventLoopTestsMixin): 2023 def setUp(self): 2024 super().setUp() 2025 watcher = asyncio.SafeChildWatcher() 2026 watcher.attach_loop(self.loop) 2027 asyncio.set_child_watcher(watcher) 2028 2029 def tearDown(self): 2030 asyncio.set_child_watcher(None) 2031 super().tearDown() 2032 2033 2034 if hasattr(selectors, 'KqueueSelector'): 2035 class KqueueEventLoopTests(UnixEventLoopTestsMixin, 2036 SubprocessTestsMixin, 2037 test_utils.TestCase): 2038 2039 def create_event_loop(self): 2040 return asyncio.SelectorEventLoop( 2041 selectors.KqueueSelector()) 2042 2043 # kqueue doesn't support character devices (PTY) on Mac OS X older 2044 # than 10.9 (Maverick) 2045 @support.requires_mac_ver(10, 9) 2046 # Issue #20667: KqueueEventLoopTests.test_read_pty_output() 2047 # hangs on OpenBSD 5.5 2048 @unittest.skipIf(sys.platform.startswith('openbsd'), 2049 'test hangs on OpenBSD') 2050 def test_read_pty_output(self): 2051 super().test_read_pty_output() 2052 2053 # kqueue doesn't support character devices (PTY) on Mac OS X older 2054 # than 10.9 (Maverick) 2055 @support.requires_mac_ver(10, 9) 2056 def test_write_pty(self): 2057 super().test_write_pty() 2058 2059 if hasattr(selectors, 'EpollSelector'): 2060 class EPollEventLoopTests(UnixEventLoopTestsMixin, 2061 SubprocessTestsMixin, 2062 test_utils.TestCase): 2063 2064 def create_event_loop(self): 2065 return asyncio.SelectorEventLoop(selectors.EpollSelector()) 2066 2067 if hasattr(selectors, 'PollSelector'): 2068 class PollEventLoopTests(UnixEventLoopTestsMixin, 2069 SubprocessTestsMixin, 2070 test_utils.TestCase): 2071 2072 def create_event_loop(self): 2073 return asyncio.SelectorEventLoop(selectors.PollSelector()) 2074 2075 # Should always exist. 2076 class SelectEventLoopTests(UnixEventLoopTestsMixin, 2077 SubprocessTestsMixin, 2078 test_utils.TestCase): 2079 2080 def create_event_loop(self): 2081 return asyncio.SelectorEventLoop(selectors.SelectSelector()) 2082 2083 2084def noop(*args, **kwargs): 2085 pass 2086 2087 2088class HandleTests(test_utils.TestCase): 2089 2090 def setUp(self): 2091 super().setUp() 2092 self.loop = mock.Mock() 2093 self.loop.get_debug.return_value = True 2094 2095 def test_handle(self): 2096 def callback(*args): 2097 return args 2098 2099 args = () 2100 h = asyncio.Handle(callback, args, self.loop) 2101 self.assertIs(h._callback, callback) 2102 self.assertIs(h._args, args) 2103 self.assertFalse(h.cancelled()) 2104 2105 h.cancel() 2106 self.assertTrue(h.cancelled()) 2107 2108 def test_callback_with_exception(self): 2109 def callback(): 2110 raise ValueError() 2111 2112 self.loop = mock.Mock() 2113 self.loop.call_exception_handler = mock.Mock() 2114 2115 h = asyncio.Handle(callback, (), self.loop) 2116 h._run() 2117 2118 self.loop.call_exception_handler.assert_called_with({ 2119 'message': test_utils.MockPattern('Exception in callback.*'), 2120 'exception': mock.ANY, 2121 'handle': h, 2122 'source_traceback': h._source_traceback, 2123 }) 2124 2125 def test_handle_weakref(self): 2126 wd = weakref.WeakValueDictionary() 2127 h = asyncio.Handle(lambda: None, (), self.loop) 2128 wd['h'] = h # Would fail without __weakref__ slot. 2129 2130 def test_handle_repr(self): 2131 self.loop.get_debug.return_value = False 2132 2133 # simple function 2134 h = asyncio.Handle(noop, (1, 2), self.loop) 2135 filename, lineno = test_utils.get_function_source(noop) 2136 self.assertEqual(repr(h), 2137 '<Handle noop(1, 2) at %s:%s>' 2138 % (filename, lineno)) 2139 2140 # cancelled handle 2141 h.cancel() 2142 self.assertEqual(repr(h), 2143 '<Handle cancelled>') 2144 2145 # decorated function 2146 with self.assertWarns(DeprecationWarning): 2147 cb = asyncio.coroutine(noop) 2148 h = asyncio.Handle(cb, (), self.loop) 2149 self.assertEqual(repr(h), 2150 '<Handle noop() at %s:%s>' 2151 % (filename, lineno)) 2152 2153 # partial function 2154 cb = functools.partial(noop, 1, 2) 2155 h = asyncio.Handle(cb, (3,), self.loop) 2156 regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$' 2157 % (re.escape(filename), lineno)) 2158 self.assertRegex(repr(h), regex) 2159 2160 # partial function with keyword args 2161 cb = functools.partial(noop, x=1) 2162 h = asyncio.Handle(cb, (2, 3), self.loop) 2163 regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$' 2164 % (re.escape(filename), lineno)) 2165 self.assertRegex(repr(h), regex) 2166 2167 # partial method 2168 if sys.version_info >= (3, 4): 2169 method = HandleTests.test_handle_repr 2170 cb = functools.partialmethod(method) 2171 filename, lineno = test_utils.get_function_source(method) 2172 h = asyncio.Handle(cb, (), self.loop) 2173 2174 cb_regex = r'<function HandleTests.test_handle_repr .*>' 2175 cb_regex = (r'functools.partialmethod\(%s, , \)\(\)' % cb_regex) 2176 regex = (r'^<Handle %s at %s:%s>$' 2177 % (cb_regex, re.escape(filename), lineno)) 2178 self.assertRegex(repr(h), regex) 2179 2180 def test_handle_repr_debug(self): 2181 self.loop.get_debug.return_value = True 2182 2183 # simple function 2184 create_filename = __file__ 2185 create_lineno = sys._getframe().f_lineno + 1 2186 h = asyncio.Handle(noop, (1, 2), self.loop) 2187 filename, lineno = test_utils.get_function_source(noop) 2188 self.assertEqual(repr(h), 2189 '<Handle noop(1, 2) at %s:%s created at %s:%s>' 2190 % (filename, lineno, create_filename, create_lineno)) 2191 2192 # cancelled handle 2193 h.cancel() 2194 self.assertEqual( 2195 repr(h), 2196 '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' 2197 % (filename, lineno, create_filename, create_lineno)) 2198 2199 # double cancellation won't overwrite _repr 2200 h.cancel() 2201 self.assertEqual( 2202 repr(h), 2203 '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' 2204 % (filename, lineno, create_filename, create_lineno)) 2205 2206 def test_handle_source_traceback(self): 2207 loop = asyncio.get_event_loop_policy().new_event_loop() 2208 loop.set_debug(True) 2209 self.set_event_loop(loop) 2210 2211 def check_source_traceback(h): 2212 lineno = sys._getframe(1).f_lineno - 1 2213 self.assertIsInstance(h._source_traceback, list) 2214 self.assertEqual(h._source_traceback[-1][:3], 2215 (__file__, 2216 lineno, 2217 'test_handle_source_traceback')) 2218 2219 # call_soon 2220 h = loop.call_soon(noop) 2221 check_source_traceback(h) 2222 2223 # call_soon_threadsafe 2224 h = loop.call_soon_threadsafe(noop) 2225 check_source_traceback(h) 2226 2227 # call_later 2228 h = loop.call_later(0, noop) 2229 check_source_traceback(h) 2230 2231 # call_at 2232 h = loop.call_later(0, noop) 2233 check_source_traceback(h) 2234 2235 @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'), 2236 'No collections.abc.Coroutine') 2237 def test_coroutine_like_object_debug_formatting(self): 2238 # Test that asyncio can format coroutines that are instances of 2239 # collections.abc.Coroutine, but lack cr_core or gi_code attributes 2240 # (such as ones compiled with Cython). 2241 2242 coro = CoroLike() 2243 coro.__name__ = 'AAA' 2244 self.assertTrue(asyncio.iscoroutine(coro)) 2245 self.assertEqual(coroutines._format_coroutine(coro), 'AAA()') 2246 2247 coro.__qualname__ = 'BBB' 2248 self.assertEqual(coroutines._format_coroutine(coro), 'BBB()') 2249 2250 coro.cr_running = True 2251 self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running') 2252 2253 coro.__name__ = coro.__qualname__ = None 2254 self.assertEqual(coroutines._format_coroutine(coro), 2255 '<CoroLike without __name__>() running') 2256 2257 coro = CoroLike() 2258 coro.__qualname__ = 'CoroLike' 2259 # Some coroutines might not have '__name__', such as 2260 # built-in async_gen.asend(). 2261 self.assertEqual(coroutines._format_coroutine(coro), 'CoroLike()') 2262 2263 coro = CoroLike() 2264 coro.__qualname__ = 'AAA' 2265 coro.cr_code = None 2266 self.assertEqual(coroutines._format_coroutine(coro), 'AAA()') 2267 2268 2269class TimerTests(unittest.TestCase): 2270 2271 def setUp(self): 2272 super().setUp() 2273 self.loop = mock.Mock() 2274 2275 def test_hash(self): 2276 when = time.monotonic() 2277 h = asyncio.TimerHandle(when, lambda: False, (), 2278 mock.Mock()) 2279 self.assertEqual(hash(h), hash(when)) 2280 2281 def test_when(self): 2282 when = time.monotonic() 2283 h = asyncio.TimerHandle(when, lambda: False, (), 2284 mock.Mock()) 2285 self.assertEqual(when, h.when()) 2286 2287 def test_timer(self): 2288 def callback(*args): 2289 return args 2290 2291 args = (1, 2, 3) 2292 when = time.monotonic() 2293 h = asyncio.TimerHandle(when, callback, args, mock.Mock()) 2294 self.assertIs(h._callback, callback) 2295 self.assertIs(h._args, args) 2296 self.assertFalse(h.cancelled()) 2297 2298 # cancel 2299 h.cancel() 2300 self.assertTrue(h.cancelled()) 2301 self.assertIsNone(h._callback) 2302 self.assertIsNone(h._args) 2303 2304 # when cannot be None 2305 self.assertRaises(AssertionError, 2306 asyncio.TimerHandle, None, callback, args, 2307 self.loop) 2308 2309 def test_timer_repr(self): 2310 self.loop.get_debug.return_value = False 2311 2312 # simple function 2313 h = asyncio.TimerHandle(123, noop, (), self.loop) 2314 src = test_utils.get_function_source(noop) 2315 self.assertEqual(repr(h), 2316 '<TimerHandle when=123 noop() at %s:%s>' % src) 2317 2318 # cancelled handle 2319 h.cancel() 2320 self.assertEqual(repr(h), 2321 '<TimerHandle cancelled when=123>') 2322 2323 def test_timer_repr_debug(self): 2324 self.loop.get_debug.return_value = True 2325 2326 # simple function 2327 create_filename = __file__ 2328 create_lineno = sys._getframe().f_lineno + 1 2329 h = asyncio.TimerHandle(123, noop, (), self.loop) 2330 filename, lineno = test_utils.get_function_source(noop) 2331 self.assertEqual(repr(h), 2332 '<TimerHandle when=123 noop() ' 2333 'at %s:%s created at %s:%s>' 2334 % (filename, lineno, create_filename, create_lineno)) 2335 2336 # cancelled handle 2337 h.cancel() 2338 self.assertEqual(repr(h), 2339 '<TimerHandle cancelled when=123 noop() ' 2340 'at %s:%s created at %s:%s>' 2341 % (filename, lineno, create_filename, create_lineno)) 2342 2343 2344 def test_timer_comparison(self): 2345 def callback(*args): 2346 return args 2347 2348 when = time.monotonic() 2349 2350 h1 = asyncio.TimerHandle(when, callback, (), self.loop) 2351 h2 = asyncio.TimerHandle(when, callback, (), self.loop) 2352 # TODO: Use assertLess etc. 2353 self.assertFalse(h1 < h2) 2354 self.assertFalse(h2 < h1) 2355 self.assertTrue(h1 <= h2) 2356 self.assertTrue(h2 <= h1) 2357 self.assertFalse(h1 > h2) 2358 self.assertFalse(h2 > h1) 2359 self.assertTrue(h1 >= h2) 2360 self.assertTrue(h2 >= h1) 2361 self.assertTrue(h1 == h2) 2362 self.assertFalse(h1 != h2) 2363 2364 h2.cancel() 2365 self.assertFalse(h1 == h2) 2366 2367 h1 = asyncio.TimerHandle(when, callback, (), self.loop) 2368 h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop) 2369 self.assertTrue(h1 < h2) 2370 self.assertFalse(h2 < h1) 2371 self.assertTrue(h1 <= h2) 2372 self.assertFalse(h2 <= h1) 2373 self.assertFalse(h1 > h2) 2374 self.assertTrue(h2 > h1) 2375 self.assertFalse(h1 >= h2) 2376 self.assertTrue(h2 >= h1) 2377 self.assertFalse(h1 == h2) 2378 self.assertTrue(h1 != h2) 2379 2380 h3 = asyncio.Handle(callback, (), self.loop) 2381 self.assertIs(NotImplemented, h1.__eq__(h3)) 2382 self.assertIs(NotImplemented, h1.__ne__(h3)) 2383 2384 2385class AbstractEventLoopTests(unittest.TestCase): 2386 2387 def test_not_implemented(self): 2388 f = mock.Mock() 2389 loop = asyncio.AbstractEventLoop() 2390 self.assertRaises( 2391 NotImplementedError, loop.run_forever) 2392 self.assertRaises( 2393 NotImplementedError, loop.run_until_complete, None) 2394 self.assertRaises( 2395 NotImplementedError, loop.stop) 2396 self.assertRaises( 2397 NotImplementedError, loop.is_running) 2398 self.assertRaises( 2399 NotImplementedError, loop.is_closed) 2400 self.assertRaises( 2401 NotImplementedError, loop.close) 2402 self.assertRaises( 2403 NotImplementedError, loop.create_task, None) 2404 self.assertRaises( 2405 NotImplementedError, loop.call_later, None, None) 2406 self.assertRaises( 2407 NotImplementedError, loop.call_at, f, f) 2408 self.assertRaises( 2409 NotImplementedError, loop.call_soon, None) 2410 self.assertRaises( 2411 NotImplementedError, loop.time) 2412 self.assertRaises( 2413 NotImplementedError, loop.call_soon_threadsafe, None) 2414 self.assertRaises( 2415 NotImplementedError, loop.set_default_executor, f) 2416 self.assertRaises( 2417 NotImplementedError, loop.add_reader, 1, f) 2418 self.assertRaises( 2419 NotImplementedError, loop.remove_reader, 1) 2420 self.assertRaises( 2421 NotImplementedError, loop.add_writer, 1, f) 2422 self.assertRaises( 2423 NotImplementedError, loop.remove_writer, 1) 2424 self.assertRaises( 2425 NotImplementedError, loop.add_signal_handler, 1, f) 2426 self.assertRaises( 2427 NotImplementedError, loop.remove_signal_handler, 1) 2428 self.assertRaises( 2429 NotImplementedError, loop.remove_signal_handler, 1) 2430 self.assertRaises( 2431 NotImplementedError, loop.set_exception_handler, f) 2432 self.assertRaises( 2433 NotImplementedError, loop.default_exception_handler, f) 2434 self.assertRaises( 2435 NotImplementedError, loop.call_exception_handler, f) 2436 self.assertRaises( 2437 NotImplementedError, loop.get_debug) 2438 self.assertRaises( 2439 NotImplementedError, loop.set_debug, f) 2440 2441 def test_not_implemented_async(self): 2442 2443 async def inner(): 2444 f = mock.Mock() 2445 loop = asyncio.AbstractEventLoop() 2446 2447 with self.assertRaises(NotImplementedError): 2448 await loop.run_in_executor(f, f) 2449 with self.assertRaises(NotImplementedError): 2450 await loop.getaddrinfo('localhost', 8080) 2451 with self.assertRaises(NotImplementedError): 2452 await loop.getnameinfo(('localhost', 8080)) 2453 with self.assertRaises(NotImplementedError): 2454 await loop.create_connection(f) 2455 with self.assertRaises(NotImplementedError): 2456 await loop.create_server(f) 2457 with self.assertRaises(NotImplementedError): 2458 await loop.create_datagram_endpoint(f) 2459 with self.assertRaises(NotImplementedError): 2460 await loop.sock_recv(f, 10) 2461 with self.assertRaises(NotImplementedError): 2462 await loop.sock_recv_into(f, 10) 2463 with self.assertRaises(NotImplementedError): 2464 await loop.sock_sendall(f, 10) 2465 with self.assertRaises(NotImplementedError): 2466 await loop.sock_connect(f, f) 2467 with self.assertRaises(NotImplementedError): 2468 await loop.sock_accept(f) 2469 with self.assertRaises(NotImplementedError): 2470 await loop.sock_sendfile(f, f) 2471 with self.assertRaises(NotImplementedError): 2472 await loop.sendfile(f, f) 2473 with self.assertRaises(NotImplementedError): 2474 await loop.connect_read_pipe(f, mock.sentinel.pipe) 2475 with self.assertRaises(NotImplementedError): 2476 await loop.connect_write_pipe(f, mock.sentinel.pipe) 2477 with self.assertRaises(NotImplementedError): 2478 await loop.subprocess_shell(f, mock.sentinel) 2479 with self.assertRaises(NotImplementedError): 2480 await loop.subprocess_exec(f) 2481 2482 loop = asyncio.new_event_loop() 2483 loop.run_until_complete(inner()) 2484 loop.close() 2485 2486 2487class PolicyTests(unittest.TestCase): 2488 2489 def test_event_loop_policy(self): 2490 policy = asyncio.AbstractEventLoopPolicy() 2491 self.assertRaises(NotImplementedError, policy.get_event_loop) 2492 self.assertRaises(NotImplementedError, policy.set_event_loop, object()) 2493 self.assertRaises(NotImplementedError, policy.new_event_loop) 2494 self.assertRaises(NotImplementedError, policy.get_child_watcher) 2495 self.assertRaises(NotImplementedError, policy.set_child_watcher, 2496 object()) 2497 2498 def test_get_event_loop(self): 2499 policy = asyncio.DefaultEventLoopPolicy() 2500 self.assertIsNone(policy._local._loop) 2501 2502 loop = policy.get_event_loop() 2503 self.assertIsInstance(loop, asyncio.AbstractEventLoop) 2504 2505 self.assertIs(policy._local._loop, loop) 2506 self.assertIs(loop, policy.get_event_loop()) 2507 loop.close() 2508 2509 def test_get_event_loop_calls_set_event_loop(self): 2510 policy = asyncio.DefaultEventLoopPolicy() 2511 2512 with mock.patch.object( 2513 policy, "set_event_loop", 2514 wraps=policy.set_event_loop) as m_set_event_loop: 2515 2516 loop = policy.get_event_loop() 2517 2518 # policy._local._loop must be set through .set_event_loop() 2519 # (the unix DefaultEventLoopPolicy needs this call to attach 2520 # the child watcher correctly) 2521 m_set_event_loop.assert_called_with(loop) 2522 2523 loop.close() 2524 2525 def test_get_event_loop_after_set_none(self): 2526 policy = asyncio.DefaultEventLoopPolicy() 2527 policy.set_event_loop(None) 2528 self.assertRaises(RuntimeError, policy.get_event_loop) 2529 2530 @mock.patch('asyncio.events.threading.current_thread') 2531 def test_get_event_loop_thread(self, m_current_thread): 2532 2533 def f(): 2534 policy = asyncio.DefaultEventLoopPolicy() 2535 self.assertRaises(RuntimeError, policy.get_event_loop) 2536 2537 th = threading.Thread(target=f) 2538 th.start() 2539 th.join() 2540 2541 def test_new_event_loop(self): 2542 policy = asyncio.DefaultEventLoopPolicy() 2543 2544 loop = policy.new_event_loop() 2545 self.assertIsInstance(loop, asyncio.AbstractEventLoop) 2546 loop.close() 2547 2548 def test_set_event_loop(self): 2549 policy = asyncio.DefaultEventLoopPolicy() 2550 old_loop = policy.get_event_loop() 2551 2552 self.assertRaises(AssertionError, policy.set_event_loop, object()) 2553 2554 loop = policy.new_event_loop() 2555 policy.set_event_loop(loop) 2556 self.assertIs(loop, policy.get_event_loop()) 2557 self.assertIsNot(old_loop, policy.get_event_loop()) 2558 loop.close() 2559 old_loop.close() 2560 2561 def test_get_event_loop_policy(self): 2562 policy = asyncio.get_event_loop_policy() 2563 self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy) 2564 self.assertIs(policy, asyncio.get_event_loop_policy()) 2565 2566 def test_set_event_loop_policy(self): 2567 self.assertRaises( 2568 AssertionError, asyncio.set_event_loop_policy, object()) 2569 2570 old_policy = asyncio.get_event_loop_policy() 2571 2572 policy = asyncio.DefaultEventLoopPolicy() 2573 asyncio.set_event_loop_policy(policy) 2574 self.assertIs(policy, asyncio.get_event_loop_policy()) 2575 self.assertIsNot(policy, old_policy) 2576 2577 2578class GetEventLoopTestsMixin: 2579 2580 _get_running_loop_impl = None 2581 _set_running_loop_impl = None 2582 get_running_loop_impl = None 2583 get_event_loop_impl = None 2584 2585 def setUp(self): 2586 self._get_running_loop_saved = events._get_running_loop 2587 self._set_running_loop_saved = events._set_running_loop 2588 self.get_running_loop_saved = events.get_running_loop 2589 self.get_event_loop_saved = events.get_event_loop 2590 2591 events._get_running_loop = type(self)._get_running_loop_impl 2592 events._set_running_loop = type(self)._set_running_loop_impl 2593 events.get_running_loop = type(self).get_running_loop_impl 2594 events.get_event_loop = type(self).get_event_loop_impl 2595 2596 asyncio._get_running_loop = type(self)._get_running_loop_impl 2597 asyncio._set_running_loop = type(self)._set_running_loop_impl 2598 asyncio.get_running_loop = type(self).get_running_loop_impl 2599 asyncio.get_event_loop = type(self).get_event_loop_impl 2600 2601 super().setUp() 2602 2603 self.loop = asyncio.new_event_loop() 2604 asyncio.set_event_loop(self.loop) 2605 2606 if sys.platform != 'win32': 2607 watcher = asyncio.SafeChildWatcher() 2608 watcher.attach_loop(self.loop) 2609 asyncio.set_child_watcher(watcher) 2610 2611 def tearDown(self): 2612 try: 2613 if sys.platform != 'win32': 2614 asyncio.set_child_watcher(None) 2615 2616 super().tearDown() 2617 finally: 2618 self.loop.close() 2619 asyncio.set_event_loop(None) 2620 2621 events._get_running_loop = self._get_running_loop_saved 2622 events._set_running_loop = self._set_running_loop_saved 2623 events.get_running_loop = self.get_running_loop_saved 2624 events.get_event_loop = self.get_event_loop_saved 2625 2626 asyncio._get_running_loop = self._get_running_loop_saved 2627 asyncio._set_running_loop = self._set_running_loop_saved 2628 asyncio.get_running_loop = self.get_running_loop_saved 2629 asyncio.get_event_loop = self.get_event_loop_saved 2630 2631 if sys.platform != 'win32': 2632 2633 def test_get_event_loop_new_process(self): 2634 # Issue bpo-32126: The multiprocessing module used by 2635 # ProcessPoolExecutor is not functional when the 2636 # multiprocessing.synchronize module cannot be imported. 2637 support.import_module('multiprocessing.synchronize') 2638 2639 async def main(): 2640 pool = concurrent.futures.ProcessPoolExecutor() 2641 result = await self.loop.run_in_executor( 2642 pool, _test_get_event_loop_new_process__sub_proc) 2643 pool.shutdown() 2644 return result 2645 2646 self.assertEqual( 2647 self.loop.run_until_complete(main()), 2648 'hello') 2649 2650 def test_get_event_loop_returns_running_loop(self): 2651 class TestError(Exception): 2652 pass 2653 2654 class Policy(asyncio.DefaultEventLoopPolicy): 2655 def get_event_loop(self): 2656 raise TestError 2657 2658 old_policy = asyncio.get_event_loop_policy() 2659 try: 2660 asyncio.set_event_loop_policy(Policy()) 2661 loop = asyncio.new_event_loop() 2662 2663 with self.assertRaises(TestError): 2664 asyncio.get_event_loop() 2665 asyncio.set_event_loop(None) 2666 with self.assertRaises(TestError): 2667 asyncio.get_event_loop() 2668 2669 with self.assertRaisesRegex(RuntimeError, 'no running'): 2670 self.assertIs(asyncio.get_running_loop(), None) 2671 self.assertIs(asyncio._get_running_loop(), None) 2672 2673 async def func(): 2674 self.assertIs(asyncio.get_event_loop(), loop) 2675 self.assertIs(asyncio.get_running_loop(), loop) 2676 self.assertIs(asyncio._get_running_loop(), loop) 2677 2678 loop.run_until_complete(func()) 2679 2680 asyncio.set_event_loop(loop) 2681 with self.assertRaises(TestError): 2682 asyncio.get_event_loop() 2683 2684 asyncio.set_event_loop(None) 2685 with self.assertRaises(TestError): 2686 asyncio.get_event_loop() 2687 2688 finally: 2689 asyncio.set_event_loop_policy(old_policy) 2690 if loop is not None: 2691 loop.close() 2692 2693 with self.assertRaisesRegex(RuntimeError, 'no running'): 2694 self.assertIs(asyncio.get_running_loop(), None) 2695 2696 self.assertIs(asyncio._get_running_loop(), None) 2697 2698 2699class TestPyGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase): 2700 2701 _get_running_loop_impl = events._py__get_running_loop 2702 _set_running_loop_impl = events._py__set_running_loop 2703 get_running_loop_impl = events._py_get_running_loop 2704 get_event_loop_impl = events._py_get_event_loop 2705 2706 2707try: 2708 import _asyncio # NoQA 2709except ImportError: 2710 pass 2711else: 2712 2713 class TestCGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase): 2714 2715 _get_running_loop_impl = events._c__get_running_loop 2716 _set_running_loop_impl = events._c__set_running_loop 2717 get_running_loop_impl = events._c_get_running_loop 2718 get_event_loop_impl = events._c_get_event_loop 2719 2720 2721class TestServer(unittest.TestCase): 2722 2723 def test_get_loop(self): 2724 loop = asyncio.new_event_loop() 2725 self.addCleanup(loop.close) 2726 proto = MyProto(loop) 2727 server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0)) 2728 self.assertEqual(server.get_loop(), loop) 2729 server.close() 2730 loop.run_until_complete(server.wait_closed()) 2731 2732 2733class TestAbstractServer(unittest.TestCase): 2734 2735 def test_close(self): 2736 with self.assertRaises(NotImplementedError): 2737 events.AbstractServer().close() 2738 2739 def test_wait_closed(self): 2740 loop = asyncio.new_event_loop() 2741 self.addCleanup(loop.close) 2742 2743 with self.assertRaises(NotImplementedError): 2744 loop.run_until_complete(events.AbstractServer().wait_closed()) 2745 2746 def test_get_loop(self): 2747 with self.assertRaises(NotImplementedError): 2748 events.AbstractServer().get_loop() 2749 2750 2751if __name__ == '__main__': 2752 unittest.main() 2753