1"""Tests for events.py.""" 2 3import collections.abc 4import concurrent.futures 5import functools 6import io 7import os 8import platform 9import re 10import signal 11import socket 12try: 13 import ssl 14except ImportError: 15 ssl = None 16import subprocess 17import sys 18import threading 19import time 20import errno 21import unittest 22from unittest import mock 23import weakref 24 25if sys.platform != 'win32': 26 import tty 27 28import asyncio 29from asyncio import coroutines 30from asyncio import events 31from asyncio import proactor_events 32from asyncio import selector_events 33from test.test_asyncio import utils as test_utils 34from test import support 35 36 37def tearDownModule(): 38 asyncio.set_event_loop_policy(None) 39 40 41def broken_unix_getsockname(): 42 """Return True if the platform is Mac OS 10.4 or older.""" 43 if sys.platform.startswith("aix"): 44 return True 45 elif sys.platform != 'darwin': 46 return False 47 version = platform.mac_ver()[0] 48 version = tuple(map(int, version.split('.'))) 49 return version < (10, 5) 50 51 52def _test_get_event_loop_new_process__sub_proc(): 53 async def doit(): 54 return 'hello' 55 56 loop = asyncio.new_event_loop() 57 asyncio.set_event_loop(loop) 58 return loop.run_until_complete(doit()) 59 60 61class CoroLike: 62 def send(self, v): 63 pass 64 65 def throw(self, *exc): 66 pass 67 68 def close(self): 69 pass 70 71 def __await__(self): 72 pass 73 74 75class MyBaseProto(asyncio.Protocol): 76 connected = None 77 done = None 78 79 def __init__(self, loop=None): 80 self.transport = None 81 self.state = 'INITIAL' 82 self.nbytes = 0 83 if loop is not None: 84 self.connected = loop.create_future() 85 self.done = loop.create_future() 86 87 def connection_made(self, transport): 88 self.transport = transport 89 assert self.state == 'INITIAL', self.state 90 self.state = 'CONNECTED' 91 if self.connected: 92 self.connected.set_result(None) 93 94 def data_received(self, data): 95 assert self.state == 'CONNECTED', self.state 96 self.nbytes += len(data) 97 98 def eof_received(self): 99 assert self.state == 'CONNECTED', self.state 100 self.state = 'EOF' 101 102 def connection_lost(self, exc): 103 assert self.state in ('CONNECTED', 'EOF'), self.state 104 self.state = 'CLOSED' 105 if self.done: 106 self.done.set_result(None) 107 108 109class MyProto(MyBaseProto): 110 def connection_made(self, transport): 111 super().connection_made(transport) 112 transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n') 113 114 115class MyDatagramProto(asyncio.DatagramProtocol): 116 done = None 117 118 def __init__(self, loop=None): 119 self.state = 'INITIAL' 120 self.nbytes = 0 121 if loop is not None: 122 self.done = loop.create_future() 123 124 def connection_made(self, transport): 125 self.transport = transport 126 assert self.state == 'INITIAL', self.state 127 self.state = 'INITIALIZED' 128 129 def datagram_received(self, data, addr): 130 assert self.state == 'INITIALIZED', self.state 131 self.nbytes += len(data) 132 133 def error_received(self, exc): 134 assert self.state == 'INITIALIZED', self.state 135 136 def connection_lost(self, exc): 137 assert self.state == 'INITIALIZED', self.state 138 self.state = 'CLOSED' 139 if self.done: 140 self.done.set_result(None) 141 142 143class MyReadPipeProto(asyncio.Protocol): 144 done = None 145 146 def __init__(self, loop=None): 147 self.state = ['INITIAL'] 148 self.nbytes = 0 149 self.transport = None 150 if loop is not None: 151 self.done = loop.create_future() 152 153 def connection_made(self, transport): 154 self.transport = transport 155 assert self.state == ['INITIAL'], self.state 156 self.state.append('CONNECTED') 157 158 def data_received(self, data): 159 assert self.state == ['INITIAL', 'CONNECTED'], self.state 160 self.nbytes += len(data) 161 162 def eof_received(self): 163 assert self.state == ['INITIAL', 'CONNECTED'], self.state 164 self.state.append('EOF') 165 166 def connection_lost(self, exc): 167 if 'EOF' not in self.state: 168 self.state.append('EOF') # It is okay if EOF is missed. 169 assert self.state == ['INITIAL', 'CONNECTED', 'EOF'], self.state 170 self.state.append('CLOSED') 171 if self.done: 172 self.done.set_result(None) 173 174 175class MyWritePipeProto(asyncio.BaseProtocol): 176 done = None 177 178 def __init__(self, loop=None): 179 self.state = 'INITIAL' 180 self.transport = None 181 if loop is not None: 182 self.done = loop.create_future() 183 184 def connection_made(self, transport): 185 self.transport = transport 186 assert self.state == 'INITIAL', self.state 187 self.state = 'CONNECTED' 188 189 def connection_lost(self, exc): 190 assert self.state == 'CONNECTED', self.state 191 self.state = 'CLOSED' 192 if self.done: 193 self.done.set_result(None) 194 195 196class MySubprocessProtocol(asyncio.SubprocessProtocol): 197 198 def __init__(self, loop): 199 self.state = 'INITIAL' 200 self.transport = None 201 self.connected = loop.create_future() 202 self.completed = loop.create_future() 203 self.disconnects = {fd: loop.create_future() for fd in range(3)} 204 self.data = {1: b'', 2: b''} 205 self.returncode = None 206 self.got_data = {1: asyncio.Event(loop=loop), 207 2: asyncio.Event(loop=loop)} 208 209 def connection_made(self, transport): 210 self.transport = transport 211 assert self.state == 'INITIAL', self.state 212 self.state = 'CONNECTED' 213 self.connected.set_result(None) 214 215 def connection_lost(self, exc): 216 assert self.state == 'CONNECTED', self.state 217 self.state = 'CLOSED' 218 self.completed.set_result(None) 219 220 def pipe_data_received(self, fd, data): 221 assert self.state == 'CONNECTED', self.state 222 self.data[fd] += data 223 self.got_data[fd].set() 224 225 def pipe_connection_lost(self, fd, exc): 226 assert self.state == 'CONNECTED', self.state 227 if exc: 228 self.disconnects[fd].set_exception(exc) 229 else: 230 self.disconnects[fd].set_result(exc) 231 232 def process_exited(self): 233 assert self.state == 'CONNECTED', self.state 234 self.returncode = self.transport.get_returncode() 235 236 237class EventLoopTestsMixin: 238 239 def setUp(self): 240 super().setUp() 241 self.loop = self.create_event_loop() 242 self.set_event_loop(self.loop) 243 244 def tearDown(self): 245 # just in case if we have transport close callbacks 246 if not self.loop.is_closed(): 247 test_utils.run_briefly(self.loop) 248 249 self.doCleanups() 250 support.gc_collect() 251 super().tearDown() 252 253 def test_run_until_complete_nesting(self): 254 async def coro1(): 255 await asyncio.sleep(0) 256 257 async def coro2(): 258 self.assertTrue(self.loop.is_running()) 259 self.loop.run_until_complete(coro1()) 260 261 with self.assertWarnsRegex( 262 RuntimeWarning, 263 r"coroutine \S+ was never awaited" 264 ): 265 self.assertRaises( 266 RuntimeError, self.loop.run_until_complete, coro2()) 267 268 # Note: because of the default Windows timing granularity of 269 # 15.6 msec, we use fairly long sleep times here (~100 msec). 270 271 def test_run_until_complete(self): 272 t0 = self.loop.time() 273 self.loop.run_until_complete(asyncio.sleep(0.1)) 274 t1 = self.loop.time() 275 self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0) 276 277 def test_run_until_complete_stopped(self): 278 279 async def cb(): 280 self.loop.stop() 281 await asyncio.sleep(0.1) 282 task = cb() 283 self.assertRaises(RuntimeError, 284 self.loop.run_until_complete, task) 285 286 def test_call_later(self): 287 results = [] 288 289 def callback(arg): 290 results.append(arg) 291 self.loop.stop() 292 293 self.loop.call_later(0.1, callback, 'hello world') 294 t0 = time.monotonic() 295 self.loop.run_forever() 296 t1 = time.monotonic() 297 self.assertEqual(results, ['hello world']) 298 self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0) 299 300 def test_call_soon(self): 301 results = [] 302 303 def callback(arg1, arg2): 304 results.append((arg1, arg2)) 305 self.loop.stop() 306 307 self.loop.call_soon(callback, 'hello', 'world') 308 self.loop.run_forever() 309 self.assertEqual(results, [('hello', 'world')]) 310 311 def test_call_soon_threadsafe(self): 312 results = [] 313 lock = threading.Lock() 314 315 def callback(arg): 316 results.append(arg) 317 if len(results) >= 2: 318 self.loop.stop() 319 320 def run_in_thread(): 321 self.loop.call_soon_threadsafe(callback, 'hello') 322 lock.release() 323 324 lock.acquire() 325 t = threading.Thread(target=run_in_thread) 326 t.start() 327 328 with lock: 329 self.loop.call_soon(callback, 'world') 330 self.loop.run_forever() 331 t.join() 332 self.assertEqual(results, ['hello', 'world']) 333 334 def test_call_soon_threadsafe_same_thread(self): 335 results = [] 336 337 def callback(arg): 338 results.append(arg) 339 if len(results) >= 2: 340 self.loop.stop() 341 342 self.loop.call_soon_threadsafe(callback, 'hello') 343 self.loop.call_soon(callback, 'world') 344 self.loop.run_forever() 345 self.assertEqual(results, ['hello', 'world']) 346 347 def test_run_in_executor(self): 348 def run(arg): 349 return (arg, threading.get_ident()) 350 f2 = self.loop.run_in_executor(None, run, 'yo') 351 res, thread_id = self.loop.run_until_complete(f2) 352 self.assertEqual(res, 'yo') 353 self.assertNotEqual(thread_id, threading.get_ident()) 354 355 def test_run_in_executor_cancel(self): 356 called = False 357 358 def patched_call_soon(*args): 359 nonlocal called 360 called = True 361 362 def run(): 363 time.sleep(0.05) 364 365 f2 = self.loop.run_in_executor(None, run) 366 f2.cancel() 367 self.loop.close() 368 self.loop.call_soon = patched_call_soon 369 self.loop.call_soon_threadsafe = patched_call_soon 370 time.sleep(0.4) 371 self.assertFalse(called) 372 373 def test_reader_callback(self): 374 r, w = socket.socketpair() 375 r.setblocking(False) 376 bytes_read = bytearray() 377 378 def reader(): 379 try: 380 data = r.recv(1024) 381 except BlockingIOError: 382 # Spurious readiness notifications are possible 383 # at least on Linux -- see man select. 384 return 385 if data: 386 bytes_read.extend(data) 387 else: 388 self.assertTrue(self.loop.remove_reader(r.fileno())) 389 r.close() 390 391 self.loop.add_reader(r.fileno(), reader) 392 self.loop.call_soon(w.send, b'abc') 393 test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3) 394 self.loop.call_soon(w.send, b'def') 395 test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6) 396 self.loop.call_soon(w.close) 397 self.loop.call_soon(self.loop.stop) 398 self.loop.run_forever() 399 self.assertEqual(bytes_read, b'abcdef') 400 401 def test_writer_callback(self): 402 r, w = socket.socketpair() 403 w.setblocking(False) 404 405 def writer(data): 406 w.send(data) 407 self.loop.stop() 408 409 data = b'x' * 1024 410 self.loop.add_writer(w.fileno(), writer, data) 411 self.loop.run_forever() 412 413 self.assertTrue(self.loop.remove_writer(w.fileno())) 414 self.assertFalse(self.loop.remove_writer(w.fileno())) 415 416 w.close() 417 read = r.recv(len(data) * 2) 418 r.close() 419 self.assertEqual(read, data) 420 421 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL') 422 def test_add_signal_handler(self): 423 caught = 0 424 425 def my_handler(): 426 nonlocal caught 427 caught += 1 428 429 # Check error behavior first. 430 self.assertRaises( 431 TypeError, self.loop.add_signal_handler, 'boom', my_handler) 432 self.assertRaises( 433 TypeError, self.loop.remove_signal_handler, 'boom') 434 self.assertRaises( 435 ValueError, self.loop.add_signal_handler, signal.NSIG+1, 436 my_handler) 437 self.assertRaises( 438 ValueError, self.loop.remove_signal_handler, signal.NSIG+1) 439 self.assertRaises( 440 ValueError, self.loop.add_signal_handler, 0, my_handler) 441 self.assertRaises( 442 ValueError, self.loop.remove_signal_handler, 0) 443 self.assertRaises( 444 ValueError, self.loop.add_signal_handler, -1, my_handler) 445 self.assertRaises( 446 ValueError, self.loop.remove_signal_handler, -1) 447 self.assertRaises( 448 RuntimeError, self.loop.add_signal_handler, signal.SIGKILL, 449 my_handler) 450 # Removing SIGKILL doesn't raise, since we don't call signal(). 451 self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL)) 452 # Now set a handler and handle it. 453 self.loop.add_signal_handler(signal.SIGINT, my_handler) 454 455 os.kill(os.getpid(), signal.SIGINT) 456 test_utils.run_until(self.loop, lambda: caught) 457 458 # Removing it should restore the default handler. 459 self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT)) 460 self.assertEqual(signal.getsignal(signal.SIGINT), 461 signal.default_int_handler) 462 # Removing again returns False. 463 self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT)) 464 465 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM') 466 def test_signal_handling_while_selecting(self): 467 # Test with a signal actually arriving during a select() call. 468 caught = 0 469 470 def my_handler(): 471 nonlocal caught 472 caught += 1 473 self.loop.stop() 474 475 self.loop.add_signal_handler(signal.SIGALRM, my_handler) 476 477 signal.setitimer(signal.ITIMER_REAL, 0.01, 0) # Send SIGALRM once. 478 self.loop.call_later(60, self.loop.stop) 479 self.loop.run_forever() 480 self.assertEqual(caught, 1) 481 482 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM') 483 def test_signal_handling_args(self): 484 some_args = (42,) 485 caught = 0 486 487 def my_handler(*args): 488 nonlocal caught 489 caught += 1 490 self.assertEqual(args, some_args) 491 self.loop.stop() 492 493 self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args) 494 495 signal.setitimer(signal.ITIMER_REAL, 0.1, 0) # Send SIGALRM once. 496 self.loop.call_later(60, self.loop.stop) 497 self.loop.run_forever() 498 self.assertEqual(caught, 1) 499 500 def _basetest_create_connection(self, connection_fut, check_sockname=True): 501 tr, pr = self.loop.run_until_complete(connection_fut) 502 self.assertIsInstance(tr, asyncio.Transport) 503 self.assertIsInstance(pr, asyncio.Protocol) 504 self.assertIs(pr.transport, tr) 505 if check_sockname: 506 self.assertIsNotNone(tr.get_extra_info('sockname')) 507 self.loop.run_until_complete(pr.done) 508 self.assertGreater(pr.nbytes, 0) 509 tr.close() 510 511 def test_create_connection(self): 512 with test_utils.run_test_server() as httpd: 513 conn_fut = self.loop.create_connection( 514 lambda: MyProto(loop=self.loop), *httpd.address) 515 self._basetest_create_connection(conn_fut) 516 517 @support.skip_unless_bind_unix_socket 518 def test_create_unix_connection(self): 519 # Issue #20682: On Mac OS X Tiger, getsockname() returns a 520 # zero-length address for UNIX socket. 521 check_sockname = not broken_unix_getsockname() 522 523 with test_utils.run_test_unix_server() as httpd: 524 conn_fut = self.loop.create_unix_connection( 525 lambda: MyProto(loop=self.loop), httpd.address) 526 self._basetest_create_connection(conn_fut, check_sockname) 527 528 def check_ssl_extra_info(self, client, check_sockname=True, 529 peername=None, peercert={}): 530 if check_sockname: 531 self.assertIsNotNone(client.get_extra_info('sockname')) 532 if peername: 533 self.assertEqual(peername, 534 client.get_extra_info('peername')) 535 else: 536 self.assertIsNotNone(client.get_extra_info('peername')) 537 self.assertEqual(peercert, 538 client.get_extra_info('peercert')) 539 540 # test SSL cipher 541 cipher = client.get_extra_info('cipher') 542 self.assertIsInstance(cipher, tuple) 543 self.assertEqual(len(cipher), 3, cipher) 544 self.assertIsInstance(cipher[0], str) 545 self.assertIsInstance(cipher[1], str) 546 self.assertIsInstance(cipher[2], int) 547 548 # test SSL object 549 sslobj = client.get_extra_info('ssl_object') 550 self.assertIsNotNone(sslobj) 551 self.assertEqual(sslobj.compression(), 552 client.get_extra_info('compression')) 553 self.assertEqual(sslobj.cipher(), 554 client.get_extra_info('cipher')) 555 self.assertEqual(sslobj.getpeercert(), 556 client.get_extra_info('peercert')) 557 self.assertEqual(sslobj.compression(), 558 client.get_extra_info('compression')) 559 560 def _basetest_create_ssl_connection(self, connection_fut, 561 check_sockname=True, 562 peername=None): 563 tr, pr = self.loop.run_until_complete(connection_fut) 564 self.assertIsInstance(tr, asyncio.Transport) 565 self.assertIsInstance(pr, asyncio.Protocol) 566 self.assertTrue('ssl' in tr.__class__.__name__.lower()) 567 self.check_ssl_extra_info(tr, check_sockname, peername) 568 self.loop.run_until_complete(pr.done) 569 self.assertGreater(pr.nbytes, 0) 570 tr.close() 571 572 def _test_create_ssl_connection(self, httpd, create_connection, 573 check_sockname=True, peername=None): 574 conn_fut = create_connection(ssl=test_utils.dummy_ssl_context()) 575 self._basetest_create_ssl_connection(conn_fut, check_sockname, 576 peername) 577 578 # ssl.Purpose was introduced in Python 3.4 579 if hasattr(ssl, 'Purpose'): 580 def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *, 581 cafile=None, capath=None, 582 cadata=None): 583 """ 584 A ssl.create_default_context() replacement that doesn't enable 585 cert validation. 586 """ 587 self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH) 588 return test_utils.dummy_ssl_context() 589 590 # With ssl=True, ssl.create_default_context() should be called 591 with mock.patch('ssl.create_default_context', 592 side_effect=_dummy_ssl_create_context) as m: 593 conn_fut = create_connection(ssl=True) 594 self._basetest_create_ssl_connection(conn_fut, check_sockname, 595 peername) 596 self.assertEqual(m.call_count, 1) 597 598 # With the real ssl.create_default_context(), certificate 599 # validation will fail 600 with self.assertRaises(ssl.SSLError) as cm: 601 conn_fut = create_connection(ssl=True) 602 # Ignore the "SSL handshake failed" log in debug mode 603 with test_utils.disable_logger(): 604 self._basetest_create_ssl_connection(conn_fut, check_sockname, 605 peername) 606 607 self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 608 609 @unittest.skipIf(ssl is None, 'No ssl module') 610 def test_create_ssl_connection(self): 611 with test_utils.run_test_server(use_ssl=True) as httpd: 612 create_connection = functools.partial( 613 self.loop.create_connection, 614 lambda: MyProto(loop=self.loop), 615 *httpd.address) 616 self._test_create_ssl_connection(httpd, create_connection, 617 peername=httpd.address) 618 619 @support.skip_unless_bind_unix_socket 620 @unittest.skipIf(ssl is None, 'No ssl module') 621 def test_create_ssl_unix_connection(self): 622 # Issue #20682: On Mac OS X Tiger, getsockname() returns a 623 # zero-length address for UNIX socket. 624 check_sockname = not broken_unix_getsockname() 625 626 with test_utils.run_test_unix_server(use_ssl=True) as httpd: 627 create_connection = functools.partial( 628 self.loop.create_unix_connection, 629 lambda: MyProto(loop=self.loop), httpd.address, 630 server_hostname='127.0.0.1') 631 632 self._test_create_ssl_connection(httpd, create_connection, 633 check_sockname, 634 peername=httpd.address) 635 636 def test_create_connection_local_addr(self): 637 with test_utils.run_test_server() as httpd: 638 port = support.find_unused_port() 639 f = self.loop.create_connection( 640 lambda: MyProto(loop=self.loop), 641 *httpd.address, local_addr=(httpd.address[0], port)) 642 tr, pr = self.loop.run_until_complete(f) 643 expected = pr.transport.get_extra_info('sockname')[1] 644 self.assertEqual(port, expected) 645 tr.close() 646 647 def test_create_connection_local_addr_in_use(self): 648 with test_utils.run_test_server() as httpd: 649 f = self.loop.create_connection( 650 lambda: MyProto(loop=self.loop), 651 *httpd.address, local_addr=httpd.address) 652 with self.assertRaises(OSError) as cm: 653 self.loop.run_until_complete(f) 654 self.assertEqual(cm.exception.errno, errno.EADDRINUSE) 655 self.assertIn(str(httpd.address), cm.exception.strerror) 656 657 def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None): 658 loop = self.loop 659 660 class MyProto(MyBaseProto): 661 662 def connection_lost(self, exc): 663 super().connection_lost(exc) 664 loop.call_soon(loop.stop) 665 666 def data_received(self, data): 667 super().data_received(data) 668 self.transport.write(expected_response) 669 670 lsock = socket.create_server(('127.0.0.1', 0), backlog=1) 671 addr = lsock.getsockname() 672 673 message = b'test data' 674 response = None 675 expected_response = b'roger' 676 677 def client(): 678 nonlocal response 679 try: 680 csock = socket.socket() 681 if client_ssl is not None: 682 csock = client_ssl.wrap_socket(csock) 683 csock.connect(addr) 684 csock.sendall(message) 685 response = csock.recv(99) 686 csock.close() 687 except Exception as exc: 688 print( 689 "Failure in client thread in test_connect_accepted_socket", 690 exc) 691 692 thread = threading.Thread(target=client, daemon=True) 693 thread.start() 694 695 conn, _ = lsock.accept() 696 proto = MyProto(loop=loop) 697 proto.loop = loop 698 loop.run_until_complete( 699 loop.connect_accepted_socket( 700 (lambda: proto), conn, ssl=server_ssl)) 701 loop.run_forever() 702 proto.transport.close() 703 lsock.close() 704 705 support.join_thread(thread, timeout=1) 706 self.assertFalse(thread.is_alive()) 707 self.assertEqual(proto.state, 'CLOSED') 708 self.assertEqual(proto.nbytes, len(message)) 709 self.assertEqual(response, expected_response) 710 711 @unittest.skipIf(ssl is None, 'No ssl module') 712 def test_ssl_connect_accepted_socket(self): 713 if (sys.platform == 'win32' and 714 sys.version_info < (3, 5) and 715 isinstance(self.loop, proactor_events.BaseProactorEventLoop) 716 ): 717 raise unittest.SkipTest( 718 'SSL not supported with proactor event loops before Python 3.5' 719 ) 720 721 server_context = test_utils.simple_server_sslcontext() 722 client_context = test_utils.simple_client_sslcontext() 723 724 self.test_connect_accepted_socket(server_context, client_context) 725 726 def test_connect_accepted_socket_ssl_timeout_for_plain_socket(self): 727 sock = socket.socket() 728 self.addCleanup(sock.close) 729 coro = self.loop.connect_accepted_socket( 730 MyProto, sock, ssl_handshake_timeout=1) 731 with self.assertRaisesRegex( 732 ValueError, 733 'ssl_handshake_timeout is only meaningful with ssl'): 734 self.loop.run_until_complete(coro) 735 736 @mock.patch('asyncio.base_events.socket') 737 def create_server_multiple_hosts(self, family, hosts, mock_sock): 738 async def getaddrinfo(host, port, *args, **kw): 739 if family == socket.AF_INET: 740 return [(family, socket.SOCK_STREAM, 6, '', (host, port))] 741 else: 742 return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))] 743 744 def getaddrinfo_task(*args, **kwds): 745 return self.loop.create_task(getaddrinfo(*args, **kwds)) 746 747 unique_hosts = set(hosts) 748 749 if family == socket.AF_INET: 750 mock_sock.socket().getsockbyname.side_effect = [ 751 (host, 80) for host in unique_hosts] 752 else: 753 mock_sock.socket().getsockbyname.side_effect = [ 754 (host, 80, 0, 0) for host in unique_hosts] 755 self.loop.getaddrinfo = getaddrinfo_task 756 self.loop._start_serving = mock.Mock() 757 self.loop._stop_serving = mock.Mock() 758 f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80) 759 server = self.loop.run_until_complete(f) 760 self.addCleanup(server.close) 761 server_hosts = {sock.getsockbyname()[0] for sock in server.sockets} 762 self.assertEqual(server_hosts, unique_hosts) 763 764 def test_create_server_multiple_hosts_ipv4(self): 765 self.create_server_multiple_hosts(socket.AF_INET, 766 ['1.2.3.4', '5.6.7.8', '1.2.3.4']) 767 768 def test_create_server_multiple_hosts_ipv6(self): 769 self.create_server_multiple_hosts(socket.AF_INET6, 770 ['::1', '::2', '::1']) 771 772 def test_create_server(self): 773 proto = MyProto(self.loop) 774 f = self.loop.create_server(lambda: proto, '0.0.0.0', 0) 775 server = self.loop.run_until_complete(f) 776 self.assertEqual(len(server.sockets), 1) 777 sock = server.sockets[0] 778 host, port = sock.getsockname() 779 self.assertEqual(host, '0.0.0.0') 780 client = socket.socket() 781 client.connect(('127.0.0.1', port)) 782 client.sendall(b'xxx') 783 784 self.loop.run_until_complete(proto.connected) 785 self.assertEqual('CONNECTED', proto.state) 786 787 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 788 self.assertEqual(3, proto.nbytes) 789 790 # extra info is available 791 self.assertIsNotNone(proto.transport.get_extra_info('sockname')) 792 self.assertEqual('127.0.0.1', 793 proto.transport.get_extra_info('peername')[0]) 794 795 # close connection 796 proto.transport.close() 797 self.loop.run_until_complete(proto.done) 798 799 self.assertEqual('CLOSED', proto.state) 800 801 # the client socket must be closed after to avoid ECONNRESET upon 802 # recv()/send() on the serving socket 803 client.close() 804 805 # close server 806 server.close() 807 808 @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT') 809 def test_create_server_reuse_port(self): 810 proto = MyProto(self.loop) 811 f = self.loop.create_server( 812 lambda: proto, '0.0.0.0', 0) 813 server = self.loop.run_until_complete(f) 814 self.assertEqual(len(server.sockets), 1) 815 sock = server.sockets[0] 816 self.assertFalse( 817 sock.getsockopt( 818 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 819 server.close() 820 821 test_utils.run_briefly(self.loop) 822 823 proto = MyProto(self.loop) 824 f = self.loop.create_server( 825 lambda: proto, '0.0.0.0', 0, reuse_port=True) 826 server = self.loop.run_until_complete(f) 827 self.assertEqual(len(server.sockets), 1) 828 sock = server.sockets[0] 829 self.assertTrue( 830 sock.getsockopt( 831 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 832 server.close() 833 834 def _make_unix_server(self, factory, **kwargs): 835 path = test_utils.gen_unix_socket_path() 836 self.addCleanup(lambda: os.path.exists(path) and os.unlink(path)) 837 838 f = self.loop.create_unix_server(factory, path, **kwargs) 839 server = self.loop.run_until_complete(f) 840 841 return server, path 842 843 @support.skip_unless_bind_unix_socket 844 def test_create_unix_server(self): 845 proto = MyProto(loop=self.loop) 846 server, path = self._make_unix_server(lambda: proto) 847 self.assertEqual(len(server.sockets), 1) 848 849 client = socket.socket(socket.AF_UNIX) 850 client.connect(path) 851 client.sendall(b'xxx') 852 853 self.loop.run_until_complete(proto.connected) 854 self.assertEqual('CONNECTED', proto.state) 855 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 856 self.assertEqual(3, proto.nbytes) 857 858 # close connection 859 proto.transport.close() 860 self.loop.run_until_complete(proto.done) 861 862 self.assertEqual('CLOSED', proto.state) 863 864 # the client socket must be closed after to avoid ECONNRESET upon 865 # recv()/send() on the serving socket 866 client.close() 867 868 # close server 869 server.close() 870 871 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 872 def test_create_unix_server_path_socket_error(self): 873 proto = MyProto(loop=self.loop) 874 sock = socket.socket() 875 with sock: 876 f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock) 877 with self.assertRaisesRegex(ValueError, 878 'path and sock can not be specified ' 879 'at the same time'): 880 self.loop.run_until_complete(f) 881 882 def _create_ssl_context(self, certfile, keyfile=None): 883 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 884 sslcontext.options |= ssl.OP_NO_SSLv2 885 sslcontext.load_cert_chain(certfile, keyfile) 886 return sslcontext 887 888 def _make_ssl_server(self, factory, certfile, keyfile=None): 889 sslcontext = self._create_ssl_context(certfile, keyfile) 890 891 f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext) 892 server = self.loop.run_until_complete(f) 893 894 sock = server.sockets[0] 895 host, port = sock.getsockname() 896 self.assertEqual(host, '127.0.0.1') 897 return server, host, port 898 899 def _make_ssl_unix_server(self, factory, certfile, keyfile=None): 900 sslcontext = self._create_ssl_context(certfile, keyfile) 901 return self._make_unix_server(factory, ssl=sslcontext) 902 903 @unittest.skipIf(ssl is None, 'No ssl module') 904 def test_create_server_ssl(self): 905 proto = MyProto(loop=self.loop) 906 server, host, port = self._make_ssl_server( 907 lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY) 908 909 f_c = self.loop.create_connection(MyBaseProto, host, port, 910 ssl=test_utils.dummy_ssl_context()) 911 client, pr = self.loop.run_until_complete(f_c) 912 913 client.write(b'xxx') 914 self.loop.run_until_complete(proto.connected) 915 self.assertEqual('CONNECTED', proto.state) 916 917 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 918 self.assertEqual(3, proto.nbytes) 919 920 # extra info is available 921 self.check_ssl_extra_info(client, peername=(host, port)) 922 923 # close connection 924 proto.transport.close() 925 self.loop.run_until_complete(proto.done) 926 self.assertEqual('CLOSED', proto.state) 927 928 # the client socket must be closed after to avoid ECONNRESET upon 929 # recv()/send() on the serving socket 930 client.close() 931 932 # stop serving 933 server.close() 934 935 @support.skip_unless_bind_unix_socket 936 @unittest.skipIf(ssl is None, 'No ssl module') 937 def test_create_unix_server_ssl(self): 938 proto = MyProto(loop=self.loop) 939 server, path = self._make_ssl_unix_server( 940 lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY) 941 942 f_c = self.loop.create_unix_connection( 943 MyBaseProto, path, ssl=test_utils.dummy_ssl_context(), 944 server_hostname='') 945 946 client, pr = self.loop.run_until_complete(f_c) 947 948 client.write(b'xxx') 949 self.loop.run_until_complete(proto.connected) 950 self.assertEqual('CONNECTED', proto.state) 951 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 952 self.assertEqual(3, proto.nbytes) 953 954 # close connection 955 proto.transport.close() 956 self.loop.run_until_complete(proto.done) 957 self.assertEqual('CLOSED', proto.state) 958 959 # the client socket must be closed after to avoid ECONNRESET upon 960 # recv()/send() on the serving socket 961 client.close() 962 963 # stop serving 964 server.close() 965 966 @unittest.skipIf(ssl is None, 'No ssl module') 967 def test_create_server_ssl_verify_failed(self): 968 proto = MyProto(loop=self.loop) 969 server, host, port = self._make_ssl_server( 970 lambda: proto, test_utils.SIGNED_CERTFILE) 971 972 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 973 sslcontext_client.options |= ssl.OP_NO_SSLv2 974 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 975 if hasattr(sslcontext_client, 'check_hostname'): 976 sslcontext_client.check_hostname = True 977 978 979 # no CA loaded 980 f_c = self.loop.create_connection(MyProto, host, port, 981 ssl=sslcontext_client) 982 with mock.patch.object(self.loop, 'call_exception_handler'): 983 with test_utils.disable_logger(): 984 with self.assertRaisesRegex(ssl.SSLError, 985 '(?i)certificate.verify.failed'): 986 self.loop.run_until_complete(f_c) 987 988 # execute the loop to log the connection error 989 test_utils.run_briefly(self.loop) 990 991 # close connection 992 self.assertIsNone(proto.transport) 993 server.close() 994 995 @support.skip_unless_bind_unix_socket 996 @unittest.skipIf(ssl is None, 'No ssl module') 997 def test_create_unix_server_ssl_verify_failed(self): 998 proto = MyProto(loop=self.loop) 999 server, path = self._make_ssl_unix_server( 1000 lambda: proto, test_utils.SIGNED_CERTFILE) 1001 1002 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1003 sslcontext_client.options |= ssl.OP_NO_SSLv2 1004 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1005 if hasattr(sslcontext_client, 'check_hostname'): 1006 sslcontext_client.check_hostname = True 1007 1008 # no CA loaded 1009 f_c = self.loop.create_unix_connection(MyProto, path, 1010 ssl=sslcontext_client, 1011 server_hostname='invalid') 1012 with mock.patch.object(self.loop, 'call_exception_handler'): 1013 with test_utils.disable_logger(): 1014 with self.assertRaisesRegex(ssl.SSLError, 1015 '(?i)certificate.verify.failed'): 1016 self.loop.run_until_complete(f_c) 1017 1018 # execute the loop to log the connection error 1019 test_utils.run_briefly(self.loop) 1020 1021 # close connection 1022 self.assertIsNone(proto.transport) 1023 server.close() 1024 1025 @unittest.skipIf(ssl is None, 'No ssl module') 1026 def test_create_server_ssl_match_failed(self): 1027 proto = MyProto(loop=self.loop) 1028 server, host, port = self._make_ssl_server( 1029 lambda: proto, test_utils.SIGNED_CERTFILE) 1030 1031 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1032 sslcontext_client.options |= ssl.OP_NO_SSLv2 1033 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1034 sslcontext_client.load_verify_locations( 1035 cafile=test_utils.SIGNING_CA) 1036 if hasattr(sslcontext_client, 'check_hostname'): 1037 sslcontext_client.check_hostname = True 1038 1039 # incorrect server_hostname 1040 f_c = self.loop.create_connection(MyProto, host, port, 1041 ssl=sslcontext_client) 1042 with mock.patch.object(self.loop, 'call_exception_handler'): 1043 with test_utils.disable_logger(): 1044 with self.assertRaisesRegex( 1045 ssl.CertificateError, 1046 "IP address mismatch, certificate is not valid for " 1047 "'127.0.0.1'"): 1048 self.loop.run_until_complete(f_c) 1049 1050 # close connection 1051 # transport is None because TLS ALERT aborted the handshake 1052 self.assertIsNone(proto.transport) 1053 server.close() 1054 1055 @support.skip_unless_bind_unix_socket 1056 @unittest.skipIf(ssl is None, 'No ssl module') 1057 def test_create_unix_server_ssl_verified(self): 1058 proto = MyProto(loop=self.loop) 1059 server, path = self._make_ssl_unix_server( 1060 lambda: proto, test_utils.SIGNED_CERTFILE) 1061 1062 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1063 sslcontext_client.options |= ssl.OP_NO_SSLv2 1064 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1065 sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA) 1066 if hasattr(sslcontext_client, 'check_hostname'): 1067 sslcontext_client.check_hostname = True 1068 1069 # Connection succeeds with correct CA and server hostname. 1070 f_c = self.loop.create_unix_connection(MyProto, path, 1071 ssl=sslcontext_client, 1072 server_hostname='localhost') 1073 client, pr = self.loop.run_until_complete(f_c) 1074 1075 # close connection 1076 proto.transport.close() 1077 client.close() 1078 server.close() 1079 self.loop.run_until_complete(proto.done) 1080 1081 @unittest.skipIf(ssl is None, 'No ssl module') 1082 def test_create_server_ssl_verified(self): 1083 proto = MyProto(loop=self.loop) 1084 server, host, port = self._make_ssl_server( 1085 lambda: proto, test_utils.SIGNED_CERTFILE) 1086 1087 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1088 sslcontext_client.options |= ssl.OP_NO_SSLv2 1089 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1090 sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA) 1091 if hasattr(sslcontext_client, 'check_hostname'): 1092 sslcontext_client.check_hostname = True 1093 1094 # Connection succeeds with correct CA and server hostname. 1095 f_c = self.loop.create_connection(MyProto, host, port, 1096 ssl=sslcontext_client, 1097 server_hostname='localhost') 1098 client, pr = self.loop.run_until_complete(f_c) 1099 1100 # extra info is available 1101 self.check_ssl_extra_info(client, peername=(host, port), 1102 peercert=test_utils.PEERCERT) 1103 1104 # close connection 1105 proto.transport.close() 1106 client.close() 1107 server.close() 1108 self.loop.run_until_complete(proto.done) 1109 1110 def test_create_server_sock(self): 1111 proto = self.loop.create_future() 1112 1113 class TestMyProto(MyProto): 1114 def connection_made(self, transport): 1115 super().connection_made(transport) 1116 proto.set_result(self) 1117 1118 sock_ob = socket.create_server(('0.0.0.0', 0)) 1119 1120 f = self.loop.create_server(TestMyProto, sock=sock_ob) 1121 server = self.loop.run_until_complete(f) 1122 sock = server.sockets[0] 1123 self.assertEqual(sock.fileno(), sock_ob.fileno()) 1124 1125 host, port = sock.getsockname() 1126 self.assertEqual(host, '0.0.0.0') 1127 client = socket.socket() 1128 client.connect(('127.0.0.1', port)) 1129 client.send(b'xxx') 1130 client.close() 1131 server.close() 1132 1133 def test_create_server_addr_in_use(self): 1134 sock_ob = socket.create_server(('0.0.0.0', 0)) 1135 1136 f = self.loop.create_server(MyProto, sock=sock_ob) 1137 server = self.loop.run_until_complete(f) 1138 sock = server.sockets[0] 1139 host, port = sock.getsockname() 1140 1141 f = self.loop.create_server(MyProto, host=host, port=port) 1142 with self.assertRaises(OSError) as cm: 1143 self.loop.run_until_complete(f) 1144 self.assertEqual(cm.exception.errno, errno.EADDRINUSE) 1145 1146 server.close() 1147 1148 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled') 1149 def test_create_server_dual_stack(self): 1150 f_proto = self.loop.create_future() 1151 1152 class TestMyProto(MyProto): 1153 def connection_made(self, transport): 1154 super().connection_made(transport) 1155 f_proto.set_result(self) 1156 1157 try_count = 0 1158 while True: 1159 try: 1160 port = support.find_unused_port() 1161 f = self.loop.create_server(TestMyProto, host=None, port=port) 1162 server = self.loop.run_until_complete(f) 1163 except OSError as ex: 1164 if ex.errno == errno.EADDRINUSE: 1165 try_count += 1 1166 self.assertGreaterEqual(5, try_count) 1167 continue 1168 else: 1169 raise 1170 else: 1171 break 1172 client = socket.socket() 1173 client.connect(('127.0.0.1', port)) 1174 client.send(b'xxx') 1175 proto = self.loop.run_until_complete(f_proto) 1176 proto.transport.close() 1177 client.close() 1178 1179 f_proto = self.loop.create_future() 1180 client = socket.socket(socket.AF_INET6) 1181 client.connect(('::1', port)) 1182 client.send(b'xxx') 1183 proto = self.loop.run_until_complete(f_proto) 1184 proto.transport.close() 1185 client.close() 1186 1187 server.close() 1188 1189 def test_server_close(self): 1190 f = self.loop.create_server(MyProto, '0.0.0.0', 0) 1191 server = self.loop.run_until_complete(f) 1192 sock = server.sockets[0] 1193 host, port = sock.getsockname() 1194 1195 client = socket.socket() 1196 client.connect(('127.0.0.1', port)) 1197 client.send(b'xxx') 1198 client.close() 1199 1200 server.close() 1201 1202 client = socket.socket() 1203 self.assertRaises( 1204 ConnectionRefusedError, client.connect, ('127.0.0.1', port)) 1205 client.close() 1206 1207 def _test_create_datagram_endpoint(self, local_addr, family): 1208 class TestMyDatagramProto(MyDatagramProto): 1209 def __init__(inner_self): 1210 super().__init__(loop=self.loop) 1211 1212 def datagram_received(self, data, addr): 1213 super().datagram_received(data, addr) 1214 self.transport.sendto(b'resp:'+data, addr) 1215 1216 coro = self.loop.create_datagram_endpoint( 1217 TestMyDatagramProto, local_addr=local_addr, family=family) 1218 s_transport, server = self.loop.run_until_complete(coro) 1219 sockname = s_transport.get_extra_info('sockname') 1220 host, port = socket.getnameinfo( 1221 sockname, socket.NI_NUMERICHOST|socket.NI_NUMERICSERV) 1222 1223 self.assertIsInstance(s_transport, asyncio.Transport) 1224 self.assertIsInstance(server, TestMyDatagramProto) 1225 self.assertEqual('INITIALIZED', server.state) 1226 self.assertIs(server.transport, s_transport) 1227 1228 coro = self.loop.create_datagram_endpoint( 1229 lambda: MyDatagramProto(loop=self.loop), 1230 remote_addr=(host, port)) 1231 transport, client = self.loop.run_until_complete(coro) 1232 1233 self.assertIsInstance(transport, asyncio.Transport) 1234 self.assertIsInstance(client, MyDatagramProto) 1235 self.assertEqual('INITIALIZED', client.state) 1236 self.assertIs(client.transport, transport) 1237 1238 transport.sendto(b'xxx') 1239 test_utils.run_until(self.loop, lambda: server.nbytes) 1240 self.assertEqual(3, server.nbytes) 1241 test_utils.run_until(self.loop, lambda: client.nbytes) 1242 1243 # received 1244 self.assertEqual(8, client.nbytes) 1245 1246 # extra info is available 1247 self.assertIsNotNone(transport.get_extra_info('sockname')) 1248 1249 # close connection 1250 transport.close() 1251 self.loop.run_until_complete(client.done) 1252 self.assertEqual('CLOSED', client.state) 1253 server.transport.close() 1254 1255 def test_create_datagram_endpoint(self): 1256 self._test_create_datagram_endpoint(('127.0.0.1', 0), socket.AF_INET) 1257 1258 @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled') 1259 def test_create_datagram_endpoint_ipv6(self): 1260 self._test_create_datagram_endpoint(('::1', 0), socket.AF_INET6) 1261 1262 def test_create_datagram_endpoint_sock(self): 1263 sock = None 1264 local_address = ('127.0.0.1', 0) 1265 infos = self.loop.run_until_complete( 1266 self.loop.getaddrinfo( 1267 *local_address, type=socket.SOCK_DGRAM)) 1268 for family, type, proto, cname, address in infos: 1269 try: 1270 sock = socket.socket(family=family, type=type, proto=proto) 1271 sock.setblocking(False) 1272 sock.bind(address) 1273 except: 1274 pass 1275 else: 1276 break 1277 else: 1278 assert False, 'Can not create socket.' 1279 1280 f = self.loop.create_datagram_endpoint( 1281 lambda: MyDatagramProto(loop=self.loop), sock=sock) 1282 tr, pr = self.loop.run_until_complete(f) 1283 self.assertIsInstance(tr, asyncio.Transport) 1284 self.assertIsInstance(pr, MyDatagramProto) 1285 tr.close() 1286 self.loop.run_until_complete(pr.done) 1287 1288 def test_internal_fds(self): 1289 loop = self.create_event_loop() 1290 if not isinstance(loop, selector_events.BaseSelectorEventLoop): 1291 loop.close() 1292 self.skipTest('loop is not a BaseSelectorEventLoop') 1293 1294 self.assertEqual(1, loop._internal_fds) 1295 loop.close() 1296 self.assertEqual(0, loop._internal_fds) 1297 self.assertIsNone(loop._csock) 1298 self.assertIsNone(loop._ssock) 1299 1300 @unittest.skipUnless(sys.platform != 'win32', 1301 "Don't support pipes for Windows") 1302 def test_read_pipe(self): 1303 proto = MyReadPipeProto(loop=self.loop) 1304 1305 rpipe, wpipe = os.pipe() 1306 pipeobj = io.open(rpipe, 'rb', 1024) 1307 1308 async def connect(): 1309 t, p = await self.loop.connect_read_pipe( 1310 lambda: proto, pipeobj) 1311 self.assertIs(p, proto) 1312 self.assertIs(t, proto.transport) 1313 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1314 self.assertEqual(0, proto.nbytes) 1315 1316 self.loop.run_until_complete(connect()) 1317 1318 os.write(wpipe, b'1') 1319 test_utils.run_until(self.loop, lambda: proto.nbytes >= 1) 1320 self.assertEqual(1, proto.nbytes) 1321 1322 os.write(wpipe, b'2345') 1323 test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) 1324 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1325 self.assertEqual(5, proto.nbytes) 1326 1327 os.close(wpipe) 1328 self.loop.run_until_complete(proto.done) 1329 self.assertEqual( 1330 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) 1331 # extra info is available 1332 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1333 1334 @unittest.skipUnless(sys.platform != 'win32', 1335 "Don't support pipes for Windows") 1336 def test_unclosed_pipe_transport(self): 1337 # This test reproduces the issue #314 on GitHub 1338 loop = self.create_event_loop() 1339 read_proto = MyReadPipeProto(loop=loop) 1340 write_proto = MyWritePipeProto(loop=loop) 1341 1342 rpipe, wpipe = os.pipe() 1343 rpipeobj = io.open(rpipe, 'rb', 1024) 1344 wpipeobj = io.open(wpipe, 'w', 1024) 1345 1346 async def connect(): 1347 read_transport, _ = await loop.connect_read_pipe( 1348 lambda: read_proto, rpipeobj) 1349 write_transport, _ = await loop.connect_write_pipe( 1350 lambda: write_proto, wpipeobj) 1351 return read_transport, write_transport 1352 1353 # Run and close the loop without closing the transports 1354 read_transport, write_transport = loop.run_until_complete(connect()) 1355 loop.close() 1356 1357 # These 'repr' calls used to raise an AttributeError 1358 # See Issue #314 on GitHub 1359 self.assertIn('open', repr(read_transport)) 1360 self.assertIn('open', repr(write_transport)) 1361 1362 # Clean up (avoid ResourceWarning) 1363 rpipeobj.close() 1364 wpipeobj.close() 1365 read_transport._pipe = None 1366 write_transport._pipe = None 1367 1368 @unittest.skipUnless(sys.platform != 'win32', 1369 "Don't support pipes for Windows") 1370 def test_read_pty_output(self): 1371 proto = MyReadPipeProto(loop=self.loop) 1372 1373 master, slave = os.openpty() 1374 master_read_obj = io.open(master, 'rb', 0) 1375 1376 async def connect(): 1377 t, p = await self.loop.connect_read_pipe(lambda: proto, 1378 master_read_obj) 1379 self.assertIs(p, proto) 1380 self.assertIs(t, proto.transport) 1381 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1382 self.assertEqual(0, proto.nbytes) 1383 1384 self.loop.run_until_complete(connect()) 1385 1386 os.write(slave, b'1') 1387 test_utils.run_until(self.loop, lambda: proto.nbytes) 1388 self.assertEqual(1, proto.nbytes) 1389 1390 os.write(slave, b'2345') 1391 test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) 1392 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1393 self.assertEqual(5, proto.nbytes) 1394 1395 os.close(slave) 1396 proto.transport.close() 1397 self.loop.run_until_complete(proto.done) 1398 self.assertEqual( 1399 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) 1400 # extra info is available 1401 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1402 1403 @unittest.skipUnless(sys.platform != 'win32', 1404 "Don't support pipes for Windows") 1405 def test_write_pipe(self): 1406 rpipe, wpipe = os.pipe() 1407 pipeobj = io.open(wpipe, 'wb', 1024) 1408 1409 proto = MyWritePipeProto(loop=self.loop) 1410 connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) 1411 transport, p = self.loop.run_until_complete(connect) 1412 self.assertIs(p, proto) 1413 self.assertIs(transport, proto.transport) 1414 self.assertEqual('CONNECTED', proto.state) 1415 1416 transport.write(b'1') 1417 1418 data = bytearray() 1419 def reader(data): 1420 chunk = os.read(rpipe, 1024) 1421 data += chunk 1422 return len(data) 1423 1424 test_utils.run_until(self.loop, lambda: reader(data) >= 1) 1425 self.assertEqual(b'1', data) 1426 1427 transport.write(b'2345') 1428 test_utils.run_until(self.loop, lambda: reader(data) >= 5) 1429 self.assertEqual(b'12345', data) 1430 self.assertEqual('CONNECTED', proto.state) 1431 1432 os.close(rpipe) 1433 1434 # extra info is available 1435 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1436 1437 # close connection 1438 proto.transport.close() 1439 self.loop.run_until_complete(proto.done) 1440 self.assertEqual('CLOSED', proto.state) 1441 1442 @unittest.skipUnless(sys.platform != 'win32', 1443 "Don't support pipes for Windows") 1444 def test_write_pipe_disconnect_on_close(self): 1445 rsock, wsock = socket.socketpair() 1446 rsock.setblocking(False) 1447 pipeobj = io.open(wsock.detach(), 'wb', 1024) 1448 1449 proto = MyWritePipeProto(loop=self.loop) 1450 connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) 1451 transport, p = self.loop.run_until_complete(connect) 1452 self.assertIs(p, proto) 1453 self.assertIs(transport, proto.transport) 1454 self.assertEqual('CONNECTED', proto.state) 1455 1456 transport.write(b'1') 1457 data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024)) 1458 self.assertEqual(b'1', data) 1459 1460 rsock.close() 1461 1462 self.loop.run_until_complete(proto.done) 1463 self.assertEqual('CLOSED', proto.state) 1464 1465 @unittest.skipUnless(sys.platform != 'win32', 1466 "Don't support pipes for Windows") 1467 # select, poll and kqueue don't support character devices (PTY) on Mac OS X 1468 # older than 10.6 (Snow Leopard) 1469 @support.requires_mac_ver(10, 6) 1470 def test_write_pty(self): 1471 master, slave = os.openpty() 1472 slave_write_obj = io.open(slave, 'wb', 0) 1473 1474 proto = MyWritePipeProto(loop=self.loop) 1475 connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj) 1476 transport, p = self.loop.run_until_complete(connect) 1477 self.assertIs(p, proto) 1478 self.assertIs(transport, proto.transport) 1479 self.assertEqual('CONNECTED', proto.state) 1480 1481 transport.write(b'1') 1482 1483 data = bytearray() 1484 def reader(data): 1485 chunk = os.read(master, 1024) 1486 data += chunk 1487 return len(data) 1488 1489 test_utils.run_until(self.loop, lambda: reader(data) >= 1, 1490 timeout=10) 1491 self.assertEqual(b'1', data) 1492 1493 transport.write(b'2345') 1494 test_utils.run_until(self.loop, lambda: reader(data) >= 5, 1495 timeout=10) 1496 self.assertEqual(b'12345', data) 1497 self.assertEqual('CONNECTED', proto.state) 1498 1499 os.close(master) 1500 1501 # extra info is available 1502 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1503 1504 # close connection 1505 proto.transport.close() 1506 self.loop.run_until_complete(proto.done) 1507 self.assertEqual('CLOSED', proto.state) 1508 1509 @unittest.skipUnless(sys.platform != 'win32', 1510 "Don't support pipes for Windows") 1511 # select, poll and kqueue don't support character devices (PTY) on Mac OS X 1512 # older than 10.6 (Snow Leopard) 1513 @support.requires_mac_ver(10, 6) 1514 def test_bidirectional_pty(self): 1515 master, read_slave = os.openpty() 1516 write_slave = os.dup(read_slave) 1517 tty.setraw(read_slave) 1518 1519 slave_read_obj = io.open(read_slave, 'rb', 0) 1520 read_proto = MyReadPipeProto(loop=self.loop) 1521 read_connect = self.loop.connect_read_pipe(lambda: read_proto, 1522 slave_read_obj) 1523 read_transport, p = self.loop.run_until_complete(read_connect) 1524 self.assertIs(p, read_proto) 1525 self.assertIs(read_transport, read_proto.transport) 1526 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1527 self.assertEqual(0, read_proto.nbytes) 1528 1529 1530 slave_write_obj = io.open(write_slave, 'wb', 0) 1531 write_proto = MyWritePipeProto(loop=self.loop) 1532 write_connect = self.loop.connect_write_pipe(lambda: write_proto, 1533 slave_write_obj) 1534 write_transport, p = self.loop.run_until_complete(write_connect) 1535 self.assertIs(p, write_proto) 1536 self.assertIs(write_transport, write_proto.transport) 1537 self.assertEqual('CONNECTED', write_proto.state) 1538 1539 data = bytearray() 1540 def reader(data): 1541 chunk = os.read(master, 1024) 1542 data += chunk 1543 return len(data) 1544 1545 write_transport.write(b'1') 1546 test_utils.run_until(self.loop, lambda: reader(data) >= 1, timeout=10) 1547 self.assertEqual(b'1', data) 1548 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1549 self.assertEqual('CONNECTED', write_proto.state) 1550 1551 os.write(master, b'a') 1552 test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1, 1553 timeout=10) 1554 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1555 self.assertEqual(1, read_proto.nbytes) 1556 self.assertEqual('CONNECTED', write_proto.state) 1557 1558 write_transport.write(b'2345') 1559 test_utils.run_until(self.loop, lambda: reader(data) >= 5, timeout=10) 1560 self.assertEqual(b'12345', data) 1561 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1562 self.assertEqual('CONNECTED', write_proto.state) 1563 1564 os.write(master, b'bcde') 1565 test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5, 1566 timeout=10) 1567 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1568 self.assertEqual(5, read_proto.nbytes) 1569 self.assertEqual('CONNECTED', write_proto.state) 1570 1571 os.close(master) 1572 1573 read_transport.close() 1574 self.loop.run_until_complete(read_proto.done) 1575 self.assertEqual( 1576 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state) 1577 1578 write_transport.close() 1579 self.loop.run_until_complete(write_proto.done) 1580 self.assertEqual('CLOSED', write_proto.state) 1581 1582 def test_prompt_cancellation(self): 1583 r, w = socket.socketpair() 1584 r.setblocking(False) 1585 f = self.loop.create_task(self.loop.sock_recv(r, 1)) 1586 ov = getattr(f, 'ov', None) 1587 if ov is not None: 1588 self.assertTrue(ov.pending) 1589 1590 async def main(): 1591 try: 1592 self.loop.call_soon(f.cancel) 1593 await f 1594 except asyncio.CancelledError: 1595 res = 'cancelled' 1596 else: 1597 res = None 1598 finally: 1599 self.loop.stop() 1600 return res 1601 1602 start = time.monotonic() 1603 t = self.loop.create_task(main()) 1604 self.loop.run_forever() 1605 elapsed = time.monotonic() - start 1606 1607 self.assertLess(elapsed, 0.1) 1608 self.assertEqual(t.result(), 'cancelled') 1609 self.assertRaises(asyncio.CancelledError, f.result) 1610 if ov is not None: 1611 self.assertFalse(ov.pending) 1612 self.loop._stop_serving(r) 1613 1614 r.close() 1615 w.close() 1616 1617 def test_timeout_rounding(self): 1618 def _run_once(): 1619 self.loop._run_once_counter += 1 1620 orig_run_once() 1621 1622 orig_run_once = self.loop._run_once 1623 self.loop._run_once_counter = 0 1624 self.loop._run_once = _run_once 1625 1626 async def wait(): 1627 loop = self.loop 1628 await asyncio.sleep(1e-2) 1629 await asyncio.sleep(1e-4) 1630 await asyncio.sleep(1e-6) 1631 await asyncio.sleep(1e-8) 1632 await asyncio.sleep(1e-10) 1633 1634 self.loop.run_until_complete(wait()) 1635 # The ideal number of call is 12, but on some platforms, the selector 1636 # may sleep at little bit less than timeout depending on the resolution 1637 # of the clock used by the kernel. Tolerate a few useless calls on 1638 # these platforms. 1639 self.assertLessEqual(self.loop._run_once_counter, 20, 1640 {'clock_resolution': self.loop._clock_resolution, 1641 'selector': self.loop._selector.__class__.__name__}) 1642 1643 def test_remove_fds_after_closing(self): 1644 loop = self.create_event_loop() 1645 callback = lambda: None 1646 r, w = socket.socketpair() 1647 self.addCleanup(r.close) 1648 self.addCleanup(w.close) 1649 loop.add_reader(r, callback) 1650 loop.add_writer(w, callback) 1651 loop.close() 1652 self.assertFalse(loop.remove_reader(r)) 1653 self.assertFalse(loop.remove_writer(w)) 1654 1655 def test_add_fds_after_closing(self): 1656 loop = self.create_event_loop() 1657 callback = lambda: None 1658 r, w = socket.socketpair() 1659 self.addCleanup(r.close) 1660 self.addCleanup(w.close) 1661 loop.close() 1662 with self.assertRaises(RuntimeError): 1663 loop.add_reader(r, callback) 1664 with self.assertRaises(RuntimeError): 1665 loop.add_writer(w, callback) 1666 1667 def test_close_running_event_loop(self): 1668 async def close_loop(loop): 1669 self.loop.close() 1670 1671 coro = close_loop(self.loop) 1672 with self.assertRaises(RuntimeError): 1673 self.loop.run_until_complete(coro) 1674 1675 def test_close(self): 1676 self.loop.close() 1677 1678 async def test(): 1679 pass 1680 1681 func = lambda: False 1682 coro = test() 1683 self.addCleanup(coro.close) 1684 1685 # operation blocked when the loop is closed 1686 with self.assertRaises(RuntimeError): 1687 self.loop.run_forever() 1688 with self.assertRaises(RuntimeError): 1689 fut = self.loop.create_future() 1690 self.loop.run_until_complete(fut) 1691 with self.assertRaises(RuntimeError): 1692 self.loop.call_soon(func) 1693 with self.assertRaises(RuntimeError): 1694 self.loop.call_soon_threadsafe(func) 1695 with self.assertRaises(RuntimeError): 1696 self.loop.call_later(1.0, func) 1697 with self.assertRaises(RuntimeError): 1698 self.loop.call_at(self.loop.time() + .0, func) 1699 with self.assertRaises(RuntimeError): 1700 self.loop.create_task(coro) 1701 with self.assertRaises(RuntimeError): 1702 self.loop.add_signal_handler(signal.SIGTERM, func) 1703 1704 # run_in_executor test is tricky: the method is a coroutine, 1705 # but run_until_complete cannot be called on closed loop. 1706 # Thus iterate once explicitly. 1707 with self.assertRaises(RuntimeError): 1708 it = self.loop.run_in_executor(None, func).__await__() 1709 next(it) 1710 1711 1712class SubprocessTestsMixin: 1713 1714 def check_terminated(self, returncode): 1715 if sys.platform == 'win32': 1716 self.assertIsInstance(returncode, int) 1717 # expect 1 but sometimes get 0 1718 else: 1719 self.assertEqual(-signal.SIGTERM, returncode) 1720 1721 def check_killed(self, returncode): 1722 if sys.platform == 'win32': 1723 self.assertIsInstance(returncode, int) 1724 # expect 1 but sometimes get 0 1725 else: 1726 self.assertEqual(-signal.SIGKILL, returncode) 1727 1728 def test_subprocess_exec(self): 1729 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1730 1731 connect = self.loop.subprocess_exec( 1732 functools.partial(MySubprocessProtocol, self.loop), 1733 sys.executable, prog) 1734 with self.assertWarns(DeprecationWarning): 1735 transp, proto = self.loop.run_until_complete(connect) 1736 self.assertIsInstance(proto, MySubprocessProtocol) 1737 self.loop.run_until_complete(proto.connected) 1738 self.assertEqual('CONNECTED', proto.state) 1739 1740 stdin = transp.get_pipe_transport(0) 1741 stdin.write(b'Python The Winner') 1742 self.loop.run_until_complete(proto.got_data[1].wait()) 1743 with test_utils.disable_logger(): 1744 transp.close() 1745 self.loop.run_until_complete(proto.completed) 1746 self.check_killed(proto.returncode) 1747 self.assertEqual(b'Python The Winner', proto.data[1]) 1748 1749 def test_subprocess_interactive(self): 1750 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1751 1752 connect = self.loop.subprocess_exec( 1753 functools.partial(MySubprocessProtocol, self.loop), 1754 sys.executable, prog) 1755 1756 with self.assertWarns(DeprecationWarning): 1757 transp, proto = self.loop.run_until_complete(connect) 1758 self.assertIsInstance(proto, MySubprocessProtocol) 1759 self.loop.run_until_complete(proto.connected) 1760 self.assertEqual('CONNECTED', proto.state) 1761 1762 stdin = transp.get_pipe_transport(0) 1763 stdin.write(b'Python ') 1764 self.loop.run_until_complete(proto.got_data[1].wait()) 1765 proto.got_data[1].clear() 1766 self.assertEqual(b'Python ', proto.data[1]) 1767 1768 stdin.write(b'The Winner') 1769 self.loop.run_until_complete(proto.got_data[1].wait()) 1770 self.assertEqual(b'Python The Winner', proto.data[1]) 1771 1772 with test_utils.disable_logger(): 1773 transp.close() 1774 self.loop.run_until_complete(proto.completed) 1775 self.check_killed(proto.returncode) 1776 1777 def test_subprocess_shell(self): 1778 with self.assertWarns(DeprecationWarning): 1779 connect = self.loop.subprocess_shell( 1780 functools.partial(MySubprocessProtocol, self.loop), 1781 'echo Python') 1782 transp, proto = self.loop.run_until_complete(connect) 1783 self.assertIsInstance(proto, MySubprocessProtocol) 1784 self.loop.run_until_complete(proto.connected) 1785 1786 transp.get_pipe_transport(0).close() 1787 self.loop.run_until_complete(proto.completed) 1788 self.assertEqual(0, proto.returncode) 1789 self.assertTrue(all(f.done() for f in proto.disconnects.values())) 1790 self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python') 1791 self.assertEqual(proto.data[2], b'') 1792 transp.close() 1793 1794 def test_subprocess_exitcode(self): 1795 connect = self.loop.subprocess_shell( 1796 functools.partial(MySubprocessProtocol, self.loop), 1797 'exit 7', stdin=None, stdout=None, stderr=None) 1798 1799 with self.assertWarns(DeprecationWarning): 1800 transp, proto = self.loop.run_until_complete(connect) 1801 self.assertIsInstance(proto, MySubprocessProtocol) 1802 self.loop.run_until_complete(proto.completed) 1803 self.assertEqual(7, proto.returncode) 1804 transp.close() 1805 1806 def test_subprocess_close_after_finish(self): 1807 connect = self.loop.subprocess_shell( 1808 functools.partial(MySubprocessProtocol, self.loop), 1809 'exit 7', stdin=None, stdout=None, stderr=None) 1810 with self.assertWarns(DeprecationWarning): 1811 transp, proto = self.loop.run_until_complete(connect) 1812 self.assertIsInstance(proto, MySubprocessProtocol) 1813 self.assertIsNone(transp.get_pipe_transport(0)) 1814 self.assertIsNone(transp.get_pipe_transport(1)) 1815 self.assertIsNone(transp.get_pipe_transport(2)) 1816 self.loop.run_until_complete(proto.completed) 1817 self.assertEqual(7, proto.returncode) 1818 self.assertIsNone(transp.close()) 1819 1820 def test_subprocess_kill(self): 1821 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1822 1823 connect = self.loop.subprocess_exec( 1824 functools.partial(MySubprocessProtocol, self.loop), 1825 sys.executable, prog) 1826 1827 with self.assertWarns(DeprecationWarning): 1828 transp, proto = self.loop.run_until_complete(connect) 1829 self.assertIsInstance(proto, MySubprocessProtocol) 1830 self.loop.run_until_complete(proto.connected) 1831 1832 transp.kill() 1833 self.loop.run_until_complete(proto.completed) 1834 self.check_killed(proto.returncode) 1835 transp.close() 1836 1837 def test_subprocess_terminate(self): 1838 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1839 1840 connect = self.loop.subprocess_exec( 1841 functools.partial(MySubprocessProtocol, self.loop), 1842 sys.executable, prog) 1843 1844 with self.assertWarns(DeprecationWarning): 1845 transp, proto = self.loop.run_until_complete(connect) 1846 self.assertIsInstance(proto, MySubprocessProtocol) 1847 self.loop.run_until_complete(proto.connected) 1848 1849 transp.terminate() 1850 self.loop.run_until_complete(proto.completed) 1851 self.check_terminated(proto.returncode) 1852 transp.close() 1853 1854 @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") 1855 def test_subprocess_send_signal(self): 1856 # bpo-31034: Make sure that we get the default signal handler (killing 1857 # the process). The parent process may have decided to ignore SIGHUP, 1858 # and signal handlers are inherited. 1859 old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL) 1860 try: 1861 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1862 1863 connect = self.loop.subprocess_exec( 1864 functools.partial(MySubprocessProtocol, self.loop), 1865 sys.executable, prog) 1866 1867 with self.assertWarns(DeprecationWarning): 1868 transp, proto = self.loop.run_until_complete(connect) 1869 self.assertIsInstance(proto, MySubprocessProtocol) 1870 self.loop.run_until_complete(proto.connected) 1871 1872 transp.send_signal(signal.SIGHUP) 1873 self.loop.run_until_complete(proto.completed) 1874 self.assertEqual(-signal.SIGHUP, proto.returncode) 1875 transp.close() 1876 finally: 1877 signal.signal(signal.SIGHUP, old_handler) 1878 1879 def test_subprocess_stderr(self): 1880 prog = os.path.join(os.path.dirname(__file__), 'echo2.py') 1881 1882 connect = self.loop.subprocess_exec( 1883 functools.partial(MySubprocessProtocol, self.loop), 1884 sys.executable, prog) 1885 1886 with self.assertWarns(DeprecationWarning): 1887 transp, proto = self.loop.run_until_complete(connect) 1888 self.assertIsInstance(proto, MySubprocessProtocol) 1889 self.loop.run_until_complete(proto.connected) 1890 1891 stdin = transp.get_pipe_transport(0) 1892 stdin.write(b'test') 1893 1894 self.loop.run_until_complete(proto.completed) 1895 1896 transp.close() 1897 self.assertEqual(b'OUT:test', proto.data[1]) 1898 self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2]) 1899 self.assertEqual(0, proto.returncode) 1900 1901 def test_subprocess_stderr_redirect_to_stdout(self): 1902 prog = os.path.join(os.path.dirname(__file__), 'echo2.py') 1903 1904 connect = self.loop.subprocess_exec( 1905 functools.partial(MySubprocessProtocol, self.loop), 1906 sys.executable, prog, stderr=subprocess.STDOUT) 1907 1908 with self.assertWarns(DeprecationWarning): 1909 transp, proto = self.loop.run_until_complete(connect) 1910 self.assertIsInstance(proto, MySubprocessProtocol) 1911 self.loop.run_until_complete(proto.connected) 1912 1913 stdin = transp.get_pipe_transport(0) 1914 self.assertIsNotNone(transp.get_pipe_transport(1)) 1915 self.assertIsNone(transp.get_pipe_transport(2)) 1916 1917 stdin.write(b'test') 1918 self.loop.run_until_complete(proto.completed) 1919 self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'), 1920 proto.data[1]) 1921 self.assertEqual(b'', proto.data[2]) 1922 1923 transp.close() 1924 self.assertEqual(0, proto.returncode) 1925 1926 def test_subprocess_close_client_stream(self): 1927 prog = os.path.join(os.path.dirname(__file__), 'echo3.py') 1928 1929 connect = self.loop.subprocess_exec( 1930 functools.partial(MySubprocessProtocol, self.loop), 1931 sys.executable, prog) 1932 with self.assertWarns(DeprecationWarning): 1933 transp, proto = self.loop.run_until_complete(connect) 1934 self.assertIsInstance(proto, MySubprocessProtocol) 1935 self.loop.run_until_complete(proto.connected) 1936 1937 stdin = transp.get_pipe_transport(0) 1938 stdout = transp.get_pipe_transport(1) 1939 stdin.write(b'test') 1940 self.loop.run_until_complete(proto.got_data[1].wait()) 1941 self.assertEqual(b'OUT:test', proto.data[1]) 1942 1943 stdout.close() 1944 self.loop.run_until_complete(proto.disconnects[1]) 1945 stdin.write(b'xxx') 1946 self.loop.run_until_complete(proto.got_data[2].wait()) 1947 if sys.platform != 'win32': 1948 self.assertEqual(b'ERR:BrokenPipeError', proto.data[2]) 1949 else: 1950 # After closing the read-end of a pipe, writing to the 1951 # write-end using os.write() fails with errno==EINVAL and 1952 # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using 1953 # WriteFile() we get ERROR_BROKEN_PIPE as expected.) 1954 self.assertEqual(b'ERR:OSError', proto.data[2]) 1955 with test_utils.disable_logger(): 1956 transp.close() 1957 self.loop.run_until_complete(proto.completed) 1958 self.check_killed(proto.returncode) 1959 1960 def test_subprocess_wait_no_same_group(self): 1961 # start the new process in a new session 1962 connect = self.loop.subprocess_shell( 1963 functools.partial(MySubprocessProtocol, self.loop), 1964 'exit 7', stdin=None, stdout=None, stderr=None, 1965 start_new_session=True) 1966 _, proto = yield self.loop.run_until_complete(connect) 1967 self.assertIsInstance(proto, MySubprocessProtocol) 1968 self.loop.run_until_complete(proto.completed) 1969 self.assertEqual(7, proto.returncode) 1970 1971 def test_subprocess_exec_invalid_args(self): 1972 async def connect(**kwds): 1973 await self.loop.subprocess_exec( 1974 asyncio.SubprocessProtocol, 1975 'pwd', **kwds) 1976 1977 with self.assertRaises(ValueError): 1978 self.loop.run_until_complete(connect(universal_newlines=True)) 1979 with self.assertRaises(ValueError): 1980 self.loop.run_until_complete(connect(bufsize=4096)) 1981 with self.assertRaises(ValueError): 1982 self.loop.run_until_complete(connect(shell=True)) 1983 1984 def test_subprocess_shell_invalid_args(self): 1985 1986 async def connect(cmd=None, **kwds): 1987 if not cmd: 1988 cmd = 'pwd' 1989 await self.loop.subprocess_shell( 1990 asyncio.SubprocessProtocol, 1991 cmd, **kwds) 1992 1993 with self.assertRaises(ValueError): 1994 self.loop.run_until_complete(connect(['ls', '-l'])) 1995 with self.assertRaises(ValueError): 1996 self.loop.run_until_complete(connect(universal_newlines=True)) 1997 with self.assertRaises(ValueError): 1998 self.loop.run_until_complete(connect(bufsize=4096)) 1999 with self.assertRaises(ValueError): 2000 self.loop.run_until_complete(connect(shell=False)) 2001 2002 2003if sys.platform == 'win32': 2004 2005 class SelectEventLoopTests(EventLoopTestsMixin, 2006 test_utils.TestCase): 2007 2008 def create_event_loop(self): 2009 return asyncio.SelectorEventLoop() 2010 2011 class ProactorEventLoopTests(EventLoopTestsMixin, 2012 SubprocessTestsMixin, 2013 test_utils.TestCase): 2014 2015 def create_event_loop(self): 2016 return asyncio.ProactorEventLoop() 2017 2018 def test_reader_callback(self): 2019 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2020 2021 def test_reader_callback_cancel(self): 2022 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2023 2024 def test_writer_callback(self): 2025 raise unittest.SkipTest("IocpEventLoop does not have add_writer()") 2026 2027 def test_writer_callback_cancel(self): 2028 raise unittest.SkipTest("IocpEventLoop does not have add_writer()") 2029 2030 def test_remove_fds_after_closing(self): 2031 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2032else: 2033 import selectors 2034 2035 class UnixEventLoopTestsMixin(EventLoopTestsMixin): 2036 def setUp(self): 2037 super().setUp() 2038 watcher = asyncio.SafeChildWatcher() 2039 watcher.attach_loop(self.loop) 2040 asyncio.set_child_watcher(watcher) 2041 2042 def tearDown(self): 2043 asyncio.set_child_watcher(None) 2044 super().tearDown() 2045 2046 2047 if hasattr(selectors, 'KqueueSelector'): 2048 class KqueueEventLoopTests(UnixEventLoopTestsMixin, 2049 SubprocessTestsMixin, 2050 test_utils.TestCase): 2051 2052 def create_event_loop(self): 2053 return asyncio.SelectorEventLoop( 2054 selectors.KqueueSelector()) 2055 2056 # kqueue doesn't support character devices (PTY) on Mac OS X older 2057 # than 10.9 (Maverick) 2058 @support.requires_mac_ver(10, 9) 2059 # Issue #20667: KqueueEventLoopTests.test_read_pty_output() 2060 # hangs on OpenBSD 5.5 2061 @unittest.skipIf(sys.platform.startswith('openbsd'), 2062 'test hangs on OpenBSD') 2063 def test_read_pty_output(self): 2064 super().test_read_pty_output() 2065 2066 # kqueue doesn't support character devices (PTY) on Mac OS X older 2067 # than 10.9 (Maverick) 2068 @support.requires_mac_ver(10, 9) 2069 def test_write_pty(self): 2070 super().test_write_pty() 2071 2072 if hasattr(selectors, 'EpollSelector'): 2073 class EPollEventLoopTests(UnixEventLoopTestsMixin, 2074 SubprocessTestsMixin, 2075 test_utils.TestCase): 2076 2077 def create_event_loop(self): 2078 return asyncio.SelectorEventLoop(selectors.EpollSelector()) 2079 2080 if hasattr(selectors, 'PollSelector'): 2081 class PollEventLoopTests(UnixEventLoopTestsMixin, 2082 SubprocessTestsMixin, 2083 test_utils.TestCase): 2084 2085 def create_event_loop(self): 2086 return asyncio.SelectorEventLoop(selectors.PollSelector()) 2087 2088 # Should always exist. 2089 class SelectEventLoopTests(UnixEventLoopTestsMixin, 2090 SubprocessTestsMixin, 2091 test_utils.TestCase): 2092 2093 def create_event_loop(self): 2094 return asyncio.SelectorEventLoop(selectors.SelectSelector()) 2095 2096 2097def noop(*args, **kwargs): 2098 pass 2099 2100 2101class HandleTests(test_utils.TestCase): 2102 2103 def setUp(self): 2104 super().setUp() 2105 self.loop = mock.Mock() 2106 self.loop.get_debug.return_value = True 2107 2108 def test_handle(self): 2109 def callback(*args): 2110 return args 2111 2112 args = () 2113 h = asyncio.Handle(callback, args, self.loop) 2114 self.assertIs(h._callback, callback) 2115 self.assertIs(h._args, args) 2116 self.assertFalse(h.cancelled()) 2117 2118 h.cancel() 2119 self.assertTrue(h.cancelled()) 2120 2121 def test_callback_with_exception(self): 2122 def callback(): 2123 raise ValueError() 2124 2125 self.loop = mock.Mock() 2126 self.loop.call_exception_handler = mock.Mock() 2127 2128 h = asyncio.Handle(callback, (), self.loop) 2129 h._run() 2130 2131 self.loop.call_exception_handler.assert_called_with({ 2132 'message': test_utils.MockPattern('Exception in callback.*'), 2133 'exception': mock.ANY, 2134 'handle': h, 2135 'source_traceback': h._source_traceback, 2136 }) 2137 2138 def test_handle_weakref(self): 2139 wd = weakref.WeakValueDictionary() 2140 h = asyncio.Handle(lambda: None, (), self.loop) 2141 wd['h'] = h # Would fail without __weakref__ slot. 2142 2143 def test_handle_repr(self): 2144 self.loop.get_debug.return_value = False 2145 2146 # simple function 2147 h = asyncio.Handle(noop, (1, 2), self.loop) 2148 filename, lineno = test_utils.get_function_source(noop) 2149 self.assertEqual(repr(h), 2150 '<Handle noop(1, 2) at %s:%s>' 2151 % (filename, lineno)) 2152 2153 # cancelled handle 2154 h.cancel() 2155 self.assertEqual(repr(h), 2156 '<Handle cancelled>') 2157 2158 # decorated function 2159 with self.assertWarns(DeprecationWarning): 2160 cb = asyncio.coroutine(noop) 2161 h = asyncio.Handle(cb, (), self.loop) 2162 self.assertEqual(repr(h), 2163 '<Handle noop() at %s:%s>' 2164 % (filename, lineno)) 2165 2166 # partial function 2167 cb = functools.partial(noop, 1, 2) 2168 h = asyncio.Handle(cb, (3,), self.loop) 2169 regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$' 2170 % (re.escape(filename), lineno)) 2171 self.assertRegex(repr(h), regex) 2172 2173 # partial function with keyword args 2174 cb = functools.partial(noop, x=1) 2175 h = asyncio.Handle(cb, (2, 3), self.loop) 2176 regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$' 2177 % (re.escape(filename), lineno)) 2178 self.assertRegex(repr(h), regex) 2179 2180 # partial method 2181 if sys.version_info >= (3, 4): 2182 method = HandleTests.test_handle_repr 2183 cb = functools.partialmethod(method) 2184 filename, lineno = test_utils.get_function_source(method) 2185 h = asyncio.Handle(cb, (), self.loop) 2186 2187 cb_regex = r'<function HandleTests.test_handle_repr .*>' 2188 cb_regex = (r'functools.partialmethod\(%s, , \)\(\)' % cb_regex) 2189 regex = (r'^<Handle %s at %s:%s>$' 2190 % (cb_regex, re.escape(filename), lineno)) 2191 self.assertRegex(repr(h), regex) 2192 2193 def test_handle_repr_debug(self): 2194 self.loop.get_debug.return_value = True 2195 2196 # simple function 2197 create_filename = __file__ 2198 create_lineno = sys._getframe().f_lineno + 1 2199 h = asyncio.Handle(noop, (1, 2), self.loop) 2200 filename, lineno = test_utils.get_function_source(noop) 2201 self.assertEqual(repr(h), 2202 '<Handle noop(1, 2) at %s:%s created at %s:%s>' 2203 % (filename, lineno, create_filename, create_lineno)) 2204 2205 # cancelled handle 2206 h.cancel() 2207 self.assertEqual( 2208 repr(h), 2209 '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' 2210 % (filename, lineno, create_filename, create_lineno)) 2211 2212 # double cancellation won't overwrite _repr 2213 h.cancel() 2214 self.assertEqual( 2215 repr(h), 2216 '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' 2217 % (filename, lineno, create_filename, create_lineno)) 2218 2219 def test_handle_source_traceback(self): 2220 loop = asyncio.get_event_loop_policy().new_event_loop() 2221 loop.set_debug(True) 2222 self.set_event_loop(loop) 2223 2224 def check_source_traceback(h): 2225 lineno = sys._getframe(1).f_lineno - 1 2226 self.assertIsInstance(h._source_traceback, list) 2227 self.assertEqual(h._source_traceback[-1][:3], 2228 (__file__, 2229 lineno, 2230 'test_handle_source_traceback')) 2231 2232 # call_soon 2233 h = loop.call_soon(noop) 2234 check_source_traceback(h) 2235 2236 # call_soon_threadsafe 2237 h = loop.call_soon_threadsafe(noop) 2238 check_source_traceback(h) 2239 2240 # call_later 2241 h = loop.call_later(0, noop) 2242 check_source_traceback(h) 2243 2244 # call_at 2245 h = loop.call_later(0, noop) 2246 check_source_traceback(h) 2247 2248 @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'), 2249 'No collections.abc.Coroutine') 2250 def test_coroutine_like_object_debug_formatting(self): 2251 # Test that asyncio can format coroutines that are instances of 2252 # collections.abc.Coroutine, but lack cr_core or gi_code attributes 2253 # (such as ones compiled with Cython). 2254 2255 coro = CoroLike() 2256 coro.__name__ = 'AAA' 2257 self.assertTrue(asyncio.iscoroutine(coro)) 2258 self.assertEqual(coroutines._format_coroutine(coro), 'AAA()') 2259 2260 coro.__qualname__ = 'BBB' 2261 self.assertEqual(coroutines._format_coroutine(coro), 'BBB()') 2262 2263 coro.cr_running = True 2264 self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running') 2265 2266 coro.__name__ = coro.__qualname__ = None 2267 self.assertEqual(coroutines._format_coroutine(coro), 2268 '<CoroLike without __name__>() running') 2269 2270 coro = CoroLike() 2271 coro.__qualname__ = 'CoroLike' 2272 # Some coroutines might not have '__name__', such as 2273 # built-in async_gen.asend(). 2274 self.assertEqual(coroutines._format_coroutine(coro), 'CoroLike()') 2275 2276 coro = CoroLike() 2277 coro.__qualname__ = 'AAA' 2278 coro.cr_code = None 2279 self.assertEqual(coroutines._format_coroutine(coro), 'AAA()') 2280 2281 2282class TimerTests(unittest.TestCase): 2283 2284 def setUp(self): 2285 super().setUp() 2286 self.loop = mock.Mock() 2287 2288 def test_hash(self): 2289 when = time.monotonic() 2290 h = asyncio.TimerHandle(when, lambda: False, (), 2291 mock.Mock()) 2292 self.assertEqual(hash(h), hash(when)) 2293 2294 def test_when(self): 2295 when = time.monotonic() 2296 h = asyncio.TimerHandle(when, lambda: False, (), 2297 mock.Mock()) 2298 self.assertEqual(when, h.when()) 2299 2300 def test_timer(self): 2301 def callback(*args): 2302 return args 2303 2304 args = (1, 2, 3) 2305 when = time.monotonic() 2306 h = asyncio.TimerHandle(when, callback, args, mock.Mock()) 2307 self.assertIs(h._callback, callback) 2308 self.assertIs(h._args, args) 2309 self.assertFalse(h.cancelled()) 2310 2311 # cancel 2312 h.cancel() 2313 self.assertTrue(h.cancelled()) 2314 self.assertIsNone(h._callback) 2315 self.assertIsNone(h._args) 2316 2317 # when cannot be None 2318 self.assertRaises(AssertionError, 2319 asyncio.TimerHandle, None, callback, args, 2320 self.loop) 2321 2322 def test_timer_repr(self): 2323 self.loop.get_debug.return_value = False 2324 2325 # simple function 2326 h = asyncio.TimerHandle(123, noop, (), self.loop) 2327 src = test_utils.get_function_source(noop) 2328 self.assertEqual(repr(h), 2329 '<TimerHandle when=123 noop() at %s:%s>' % src) 2330 2331 # cancelled handle 2332 h.cancel() 2333 self.assertEqual(repr(h), 2334 '<TimerHandle cancelled when=123>') 2335 2336 def test_timer_repr_debug(self): 2337 self.loop.get_debug.return_value = True 2338 2339 # simple function 2340 create_filename = __file__ 2341 create_lineno = sys._getframe().f_lineno + 1 2342 h = asyncio.TimerHandle(123, noop, (), self.loop) 2343 filename, lineno = test_utils.get_function_source(noop) 2344 self.assertEqual(repr(h), 2345 '<TimerHandle when=123 noop() ' 2346 'at %s:%s created at %s:%s>' 2347 % (filename, lineno, create_filename, create_lineno)) 2348 2349 # cancelled handle 2350 h.cancel() 2351 self.assertEqual(repr(h), 2352 '<TimerHandle cancelled when=123 noop() ' 2353 'at %s:%s created at %s:%s>' 2354 % (filename, lineno, create_filename, create_lineno)) 2355 2356 2357 def test_timer_comparison(self): 2358 def callback(*args): 2359 return args 2360 2361 when = time.monotonic() 2362 2363 h1 = asyncio.TimerHandle(when, callback, (), self.loop) 2364 h2 = asyncio.TimerHandle(when, callback, (), self.loop) 2365 # TODO: Use assertLess etc. 2366 self.assertFalse(h1 < h2) 2367 self.assertFalse(h2 < h1) 2368 self.assertTrue(h1 <= h2) 2369 self.assertTrue(h2 <= h1) 2370 self.assertFalse(h1 > h2) 2371 self.assertFalse(h2 > h1) 2372 self.assertTrue(h1 >= h2) 2373 self.assertTrue(h2 >= h1) 2374 self.assertTrue(h1 == h2) 2375 self.assertFalse(h1 != h2) 2376 2377 h2.cancel() 2378 self.assertFalse(h1 == h2) 2379 2380 h1 = asyncio.TimerHandle(when, callback, (), self.loop) 2381 h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop) 2382 self.assertTrue(h1 < h2) 2383 self.assertFalse(h2 < h1) 2384 self.assertTrue(h1 <= h2) 2385 self.assertFalse(h2 <= h1) 2386 self.assertFalse(h1 > h2) 2387 self.assertTrue(h2 > h1) 2388 self.assertFalse(h1 >= h2) 2389 self.assertTrue(h2 >= h1) 2390 self.assertFalse(h1 == h2) 2391 self.assertTrue(h1 != h2) 2392 2393 h3 = asyncio.Handle(callback, (), self.loop) 2394 self.assertIs(NotImplemented, h1.__eq__(h3)) 2395 self.assertIs(NotImplemented, h1.__ne__(h3)) 2396 2397 2398class AbstractEventLoopTests(unittest.TestCase): 2399 2400 def test_not_implemented(self): 2401 f = mock.Mock() 2402 loop = asyncio.AbstractEventLoop() 2403 self.assertRaises( 2404 NotImplementedError, loop.run_forever) 2405 self.assertRaises( 2406 NotImplementedError, loop.run_until_complete, None) 2407 self.assertRaises( 2408 NotImplementedError, loop.stop) 2409 self.assertRaises( 2410 NotImplementedError, loop.is_running) 2411 self.assertRaises( 2412 NotImplementedError, loop.is_closed) 2413 self.assertRaises( 2414 NotImplementedError, loop.close) 2415 self.assertRaises( 2416 NotImplementedError, loop.create_task, None) 2417 self.assertRaises( 2418 NotImplementedError, loop.call_later, None, None) 2419 self.assertRaises( 2420 NotImplementedError, loop.call_at, f, f) 2421 self.assertRaises( 2422 NotImplementedError, loop.call_soon, None) 2423 self.assertRaises( 2424 NotImplementedError, loop.time) 2425 self.assertRaises( 2426 NotImplementedError, loop.call_soon_threadsafe, None) 2427 self.assertRaises( 2428 NotImplementedError, loop.set_default_executor, f) 2429 self.assertRaises( 2430 NotImplementedError, loop.add_reader, 1, f) 2431 self.assertRaises( 2432 NotImplementedError, loop.remove_reader, 1) 2433 self.assertRaises( 2434 NotImplementedError, loop.add_writer, 1, f) 2435 self.assertRaises( 2436 NotImplementedError, loop.remove_writer, 1) 2437 self.assertRaises( 2438 NotImplementedError, loop.add_signal_handler, 1, f) 2439 self.assertRaises( 2440 NotImplementedError, loop.remove_signal_handler, 1) 2441 self.assertRaises( 2442 NotImplementedError, loop.remove_signal_handler, 1) 2443 self.assertRaises( 2444 NotImplementedError, loop.set_exception_handler, f) 2445 self.assertRaises( 2446 NotImplementedError, loop.default_exception_handler, f) 2447 self.assertRaises( 2448 NotImplementedError, loop.call_exception_handler, f) 2449 self.assertRaises( 2450 NotImplementedError, loop.get_debug) 2451 self.assertRaises( 2452 NotImplementedError, loop.set_debug, f) 2453 2454 def test_not_implemented_async(self): 2455 2456 async def inner(): 2457 f = mock.Mock() 2458 loop = asyncio.AbstractEventLoop() 2459 2460 with self.assertRaises(NotImplementedError): 2461 await loop.run_in_executor(f, f) 2462 with self.assertRaises(NotImplementedError): 2463 await loop.getaddrinfo('localhost', 8080) 2464 with self.assertRaises(NotImplementedError): 2465 await loop.getnameinfo(('localhost', 8080)) 2466 with self.assertRaises(NotImplementedError): 2467 await loop.create_connection(f) 2468 with self.assertRaises(NotImplementedError): 2469 await loop.create_server(f) 2470 with self.assertRaises(NotImplementedError): 2471 await loop.create_datagram_endpoint(f) 2472 with self.assertRaises(NotImplementedError): 2473 await loop.sock_recv(f, 10) 2474 with self.assertRaises(NotImplementedError): 2475 await loop.sock_recv_into(f, 10) 2476 with self.assertRaises(NotImplementedError): 2477 await loop.sock_sendall(f, 10) 2478 with self.assertRaises(NotImplementedError): 2479 await loop.sock_connect(f, f) 2480 with self.assertRaises(NotImplementedError): 2481 await loop.sock_accept(f) 2482 with self.assertRaises(NotImplementedError): 2483 await loop.sock_sendfile(f, f) 2484 with self.assertRaises(NotImplementedError): 2485 await loop.sendfile(f, f) 2486 with self.assertRaises(NotImplementedError): 2487 await loop.connect_read_pipe(f, mock.sentinel.pipe) 2488 with self.assertRaises(NotImplementedError): 2489 await loop.connect_write_pipe(f, mock.sentinel.pipe) 2490 with self.assertRaises(NotImplementedError): 2491 await loop.subprocess_shell(f, mock.sentinel) 2492 with self.assertRaises(NotImplementedError): 2493 await loop.subprocess_exec(f) 2494 2495 loop = asyncio.new_event_loop() 2496 loop.run_until_complete(inner()) 2497 loop.close() 2498 2499 2500class PolicyTests(unittest.TestCase): 2501 2502 def test_event_loop_policy(self): 2503 policy = asyncio.AbstractEventLoopPolicy() 2504 self.assertRaises(NotImplementedError, policy.get_event_loop) 2505 self.assertRaises(NotImplementedError, policy.set_event_loop, object()) 2506 self.assertRaises(NotImplementedError, policy.new_event_loop) 2507 self.assertRaises(NotImplementedError, policy.get_child_watcher) 2508 self.assertRaises(NotImplementedError, policy.set_child_watcher, 2509 object()) 2510 2511 def test_get_event_loop(self): 2512 policy = asyncio.DefaultEventLoopPolicy() 2513 self.assertIsNone(policy._local._loop) 2514 2515 loop = policy.get_event_loop() 2516 self.assertIsInstance(loop, asyncio.AbstractEventLoop) 2517 2518 self.assertIs(policy._local._loop, loop) 2519 self.assertIs(loop, policy.get_event_loop()) 2520 loop.close() 2521 2522 def test_get_event_loop_calls_set_event_loop(self): 2523 policy = asyncio.DefaultEventLoopPolicy() 2524 2525 with mock.patch.object( 2526 policy, "set_event_loop", 2527 wraps=policy.set_event_loop) as m_set_event_loop: 2528 2529 loop = policy.get_event_loop() 2530 2531 # policy._local._loop must be set through .set_event_loop() 2532 # (the unix DefaultEventLoopPolicy needs this call to attach 2533 # the child watcher correctly) 2534 m_set_event_loop.assert_called_with(loop) 2535 2536 loop.close() 2537 2538 def test_get_event_loop_after_set_none(self): 2539 policy = asyncio.DefaultEventLoopPolicy() 2540 policy.set_event_loop(None) 2541 self.assertRaises(RuntimeError, policy.get_event_loop) 2542 2543 @mock.patch('asyncio.events.threading.current_thread') 2544 def test_get_event_loop_thread(self, m_current_thread): 2545 2546 def f(): 2547 policy = asyncio.DefaultEventLoopPolicy() 2548 self.assertRaises(RuntimeError, policy.get_event_loop) 2549 2550 th = threading.Thread(target=f) 2551 th.start() 2552 th.join() 2553 2554 def test_new_event_loop(self): 2555 policy = asyncio.DefaultEventLoopPolicy() 2556 2557 loop = policy.new_event_loop() 2558 self.assertIsInstance(loop, asyncio.AbstractEventLoop) 2559 loop.close() 2560 2561 def test_set_event_loop(self): 2562 policy = asyncio.DefaultEventLoopPolicy() 2563 old_loop = policy.get_event_loop() 2564 2565 self.assertRaises(AssertionError, policy.set_event_loop, object()) 2566 2567 loop = policy.new_event_loop() 2568 policy.set_event_loop(loop) 2569 self.assertIs(loop, policy.get_event_loop()) 2570 self.assertIsNot(old_loop, policy.get_event_loop()) 2571 loop.close() 2572 old_loop.close() 2573 2574 def test_get_event_loop_policy(self): 2575 policy = asyncio.get_event_loop_policy() 2576 self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy) 2577 self.assertIs(policy, asyncio.get_event_loop_policy()) 2578 2579 def test_set_event_loop_policy(self): 2580 self.assertRaises( 2581 AssertionError, asyncio.set_event_loop_policy, object()) 2582 2583 old_policy = asyncio.get_event_loop_policy() 2584 2585 policy = asyncio.DefaultEventLoopPolicy() 2586 asyncio.set_event_loop_policy(policy) 2587 self.assertIs(policy, asyncio.get_event_loop_policy()) 2588 self.assertIsNot(policy, old_policy) 2589 2590 2591class GetEventLoopTestsMixin: 2592 2593 _get_running_loop_impl = None 2594 _set_running_loop_impl = None 2595 get_running_loop_impl = None 2596 get_event_loop_impl = None 2597 2598 def setUp(self): 2599 self._get_running_loop_saved = events._get_running_loop 2600 self._set_running_loop_saved = events._set_running_loop 2601 self.get_running_loop_saved = events.get_running_loop 2602 self.get_event_loop_saved = events.get_event_loop 2603 2604 events._get_running_loop = type(self)._get_running_loop_impl 2605 events._set_running_loop = type(self)._set_running_loop_impl 2606 events.get_running_loop = type(self).get_running_loop_impl 2607 events.get_event_loop = type(self).get_event_loop_impl 2608 2609 asyncio._get_running_loop = type(self)._get_running_loop_impl 2610 asyncio._set_running_loop = type(self)._set_running_loop_impl 2611 asyncio.get_running_loop = type(self).get_running_loop_impl 2612 asyncio.get_event_loop = type(self).get_event_loop_impl 2613 2614 super().setUp() 2615 2616 self.loop = asyncio.new_event_loop() 2617 asyncio.set_event_loop(self.loop) 2618 2619 if sys.platform != 'win32': 2620 watcher = asyncio.SafeChildWatcher() 2621 watcher.attach_loop(self.loop) 2622 asyncio.set_child_watcher(watcher) 2623 2624 def tearDown(self): 2625 try: 2626 if sys.platform != 'win32': 2627 asyncio.set_child_watcher(None) 2628 2629 super().tearDown() 2630 finally: 2631 self.loop.close() 2632 asyncio.set_event_loop(None) 2633 2634 events._get_running_loop = self._get_running_loop_saved 2635 events._set_running_loop = self._set_running_loop_saved 2636 events.get_running_loop = self.get_running_loop_saved 2637 events.get_event_loop = self.get_event_loop_saved 2638 2639 asyncio._get_running_loop = self._get_running_loop_saved 2640 asyncio._set_running_loop = self._set_running_loop_saved 2641 asyncio.get_running_loop = self.get_running_loop_saved 2642 asyncio.get_event_loop = self.get_event_loop_saved 2643 2644 if sys.platform != 'win32': 2645 2646 def test_get_event_loop_new_process(self): 2647 # bpo-32126: The multiprocessing module used by 2648 # ProcessPoolExecutor is not functional when the 2649 # multiprocessing.synchronize module cannot be imported. 2650 support.skip_if_broken_multiprocessing_synchronize() 2651 2652 async def main(): 2653 pool = concurrent.futures.ProcessPoolExecutor() 2654 result = await self.loop.run_in_executor( 2655 pool, _test_get_event_loop_new_process__sub_proc) 2656 pool.shutdown() 2657 return result 2658 2659 self.assertEqual( 2660 self.loop.run_until_complete(main()), 2661 'hello') 2662 2663 def test_get_event_loop_returns_running_loop(self): 2664 class TestError(Exception): 2665 pass 2666 2667 class Policy(asyncio.DefaultEventLoopPolicy): 2668 def get_event_loop(self): 2669 raise TestError 2670 2671 old_policy = asyncio.get_event_loop_policy() 2672 try: 2673 asyncio.set_event_loop_policy(Policy()) 2674 loop = asyncio.new_event_loop() 2675 2676 with self.assertRaises(TestError): 2677 asyncio.get_event_loop() 2678 asyncio.set_event_loop(None) 2679 with self.assertRaises(TestError): 2680 asyncio.get_event_loop() 2681 2682 with self.assertRaisesRegex(RuntimeError, 'no running'): 2683 self.assertIs(asyncio.get_running_loop(), None) 2684 self.assertIs(asyncio._get_running_loop(), None) 2685 2686 async def func(): 2687 self.assertIs(asyncio.get_event_loop(), loop) 2688 self.assertIs(asyncio.get_running_loop(), loop) 2689 self.assertIs(asyncio._get_running_loop(), loop) 2690 2691 loop.run_until_complete(func()) 2692 2693 asyncio.set_event_loop(loop) 2694 with self.assertRaises(TestError): 2695 asyncio.get_event_loop() 2696 2697 asyncio.set_event_loop(None) 2698 with self.assertRaises(TestError): 2699 asyncio.get_event_loop() 2700 2701 finally: 2702 asyncio.set_event_loop_policy(old_policy) 2703 if loop is not None: 2704 loop.close() 2705 2706 with self.assertRaisesRegex(RuntimeError, 'no running'): 2707 self.assertIs(asyncio.get_running_loop(), None) 2708 2709 self.assertIs(asyncio._get_running_loop(), None) 2710 2711 2712class TestPyGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase): 2713 2714 _get_running_loop_impl = events._py__get_running_loop 2715 _set_running_loop_impl = events._py__set_running_loop 2716 get_running_loop_impl = events._py_get_running_loop 2717 get_event_loop_impl = events._py_get_event_loop 2718 2719 2720try: 2721 import _asyncio # NoQA 2722except ImportError: 2723 pass 2724else: 2725 2726 class TestCGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase): 2727 2728 _get_running_loop_impl = events._c__get_running_loop 2729 _set_running_loop_impl = events._c__set_running_loop 2730 get_running_loop_impl = events._c_get_running_loop 2731 get_event_loop_impl = events._c_get_event_loop 2732 2733 2734class TestServer(unittest.TestCase): 2735 2736 def test_get_loop(self): 2737 loop = asyncio.new_event_loop() 2738 self.addCleanup(loop.close) 2739 proto = MyProto(loop) 2740 server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0)) 2741 self.assertEqual(server.get_loop(), loop) 2742 server.close() 2743 loop.run_until_complete(server.wait_closed()) 2744 2745 2746class TestAbstractServer(unittest.TestCase): 2747 2748 def test_close(self): 2749 with self.assertRaises(NotImplementedError): 2750 events.AbstractServer().close() 2751 2752 def test_wait_closed(self): 2753 loop = asyncio.new_event_loop() 2754 self.addCleanup(loop.close) 2755 2756 with self.assertRaises(NotImplementedError): 2757 loop.run_until_complete(events.AbstractServer().wait_closed()) 2758 2759 def test_get_loop(self): 2760 with self.assertRaises(NotImplementedError): 2761 events.AbstractServer().get_loop() 2762 2763 2764if __name__ == '__main__': 2765 unittest.main() 2766