1"""Tests for unix_events.py.""" 2 3import collections 4import contextlib 5import errno 6import io 7import os 8import pathlib 9import signal 10import socket 11import stat 12import sys 13import tempfile 14import threading 15import unittest 16from unittest import mock 17from test import support 18from test.support import socket_helper 19 20if sys.platform == 'win32': 21 raise unittest.SkipTest('UNIX only') 22 23 24import asyncio 25from asyncio import log 26from asyncio import unix_events 27from test.test_asyncio import utils as test_utils 28 29 30MOCK_ANY = mock.ANY 31 32 33def tearDownModule(): 34 asyncio.set_event_loop_policy(None) 35 36 37def close_pipe_transport(transport): 38 # Don't call transport.close() because the event loop and the selector 39 # are mocked 40 if transport._pipe is None: 41 return 42 transport._pipe.close() 43 transport._pipe = None 44 45 46@unittest.skipUnless(signal, 'Signals are not supported') 47class SelectorEventLoopSignalTests(test_utils.TestCase): 48 49 def setUp(self): 50 super().setUp() 51 self.loop = asyncio.SelectorEventLoop() 52 self.set_event_loop(self.loop) 53 54 def test_check_signal(self): 55 self.assertRaises( 56 TypeError, self.loop._check_signal, '1') 57 self.assertRaises( 58 ValueError, self.loop._check_signal, signal.NSIG + 1) 59 60 def test_handle_signal_no_handler(self): 61 self.loop._handle_signal(signal.NSIG + 1) 62 63 def test_handle_signal_cancelled_handler(self): 64 h = asyncio.Handle(mock.Mock(), (), 65 loop=mock.Mock()) 66 h.cancel() 67 self.loop._signal_handlers[signal.NSIG + 1] = h 68 self.loop.remove_signal_handler = mock.Mock() 69 self.loop._handle_signal(signal.NSIG + 1) 70 self.loop.remove_signal_handler.assert_called_with(signal.NSIG + 1) 71 72 @mock.patch('asyncio.unix_events.signal') 73 def test_add_signal_handler_setup_error(self, m_signal): 74 m_signal.NSIG = signal.NSIG 75 m_signal.valid_signals = signal.valid_signals 76 m_signal.set_wakeup_fd.side_effect = ValueError 77 78 self.assertRaises( 79 RuntimeError, 80 self.loop.add_signal_handler, 81 signal.SIGINT, lambda: True) 82 83 @mock.patch('asyncio.unix_events.signal') 84 def test_add_signal_handler_coroutine_error(self, m_signal): 85 m_signal.NSIG = signal.NSIG 86 87 async def simple_coroutine(): 88 pass 89 90 # callback must not be a coroutine function 91 coro_func = simple_coroutine 92 coro_obj = coro_func() 93 self.addCleanup(coro_obj.close) 94 for func in (coro_func, coro_obj): 95 self.assertRaisesRegex( 96 TypeError, 'coroutines cannot be used with add_signal_handler', 97 self.loop.add_signal_handler, 98 signal.SIGINT, func) 99 100 @mock.patch('asyncio.unix_events.signal') 101 def test_add_signal_handler(self, m_signal): 102 m_signal.NSIG = signal.NSIG 103 m_signal.valid_signals = signal.valid_signals 104 105 cb = lambda: True 106 self.loop.add_signal_handler(signal.SIGHUP, cb) 107 h = self.loop._signal_handlers.get(signal.SIGHUP) 108 self.assertIsInstance(h, asyncio.Handle) 109 self.assertEqual(h._callback, cb) 110 111 @mock.patch('asyncio.unix_events.signal') 112 def test_add_signal_handler_install_error(self, m_signal): 113 m_signal.NSIG = signal.NSIG 114 m_signal.valid_signals = signal.valid_signals 115 116 def set_wakeup_fd(fd): 117 if fd == -1: 118 raise ValueError() 119 m_signal.set_wakeup_fd = set_wakeup_fd 120 121 class Err(OSError): 122 errno = errno.EFAULT 123 m_signal.signal.side_effect = Err 124 125 self.assertRaises( 126 Err, 127 self.loop.add_signal_handler, 128 signal.SIGINT, lambda: True) 129 130 @mock.patch('asyncio.unix_events.signal') 131 @mock.patch('asyncio.base_events.logger') 132 def test_add_signal_handler_install_error2(self, m_logging, m_signal): 133 m_signal.NSIG = signal.NSIG 134 m_signal.valid_signals = signal.valid_signals 135 136 class Err(OSError): 137 errno = errno.EINVAL 138 m_signal.signal.side_effect = Err 139 140 self.loop._signal_handlers[signal.SIGHUP] = lambda: True 141 self.assertRaises( 142 RuntimeError, 143 self.loop.add_signal_handler, 144 signal.SIGINT, lambda: True) 145 self.assertFalse(m_logging.info.called) 146 self.assertEqual(1, m_signal.set_wakeup_fd.call_count) 147 148 @mock.patch('asyncio.unix_events.signal') 149 @mock.patch('asyncio.base_events.logger') 150 def test_add_signal_handler_install_error3(self, m_logging, m_signal): 151 class Err(OSError): 152 errno = errno.EINVAL 153 m_signal.signal.side_effect = Err 154 m_signal.NSIG = signal.NSIG 155 m_signal.valid_signals = signal.valid_signals 156 157 self.assertRaises( 158 RuntimeError, 159 self.loop.add_signal_handler, 160 signal.SIGINT, lambda: True) 161 self.assertFalse(m_logging.info.called) 162 self.assertEqual(2, m_signal.set_wakeup_fd.call_count) 163 164 @mock.patch('asyncio.unix_events.signal') 165 def test_remove_signal_handler(self, m_signal): 166 m_signal.NSIG = signal.NSIG 167 m_signal.valid_signals = signal.valid_signals 168 169 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 170 171 self.assertTrue( 172 self.loop.remove_signal_handler(signal.SIGHUP)) 173 self.assertTrue(m_signal.set_wakeup_fd.called) 174 self.assertTrue(m_signal.signal.called) 175 self.assertEqual( 176 (signal.SIGHUP, m_signal.SIG_DFL), m_signal.signal.call_args[0]) 177 178 @mock.patch('asyncio.unix_events.signal') 179 def test_remove_signal_handler_2(self, m_signal): 180 m_signal.NSIG = signal.NSIG 181 m_signal.SIGINT = signal.SIGINT 182 m_signal.valid_signals = signal.valid_signals 183 184 self.loop.add_signal_handler(signal.SIGINT, lambda: True) 185 self.loop._signal_handlers[signal.SIGHUP] = object() 186 m_signal.set_wakeup_fd.reset_mock() 187 188 self.assertTrue( 189 self.loop.remove_signal_handler(signal.SIGINT)) 190 self.assertFalse(m_signal.set_wakeup_fd.called) 191 self.assertTrue(m_signal.signal.called) 192 self.assertEqual( 193 (signal.SIGINT, m_signal.default_int_handler), 194 m_signal.signal.call_args[0]) 195 196 @mock.patch('asyncio.unix_events.signal') 197 @mock.patch('asyncio.base_events.logger') 198 def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal): 199 m_signal.NSIG = signal.NSIG 200 m_signal.valid_signals = signal.valid_signals 201 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 202 203 m_signal.set_wakeup_fd.side_effect = ValueError 204 205 self.loop.remove_signal_handler(signal.SIGHUP) 206 self.assertTrue(m_logging.info) 207 208 @mock.patch('asyncio.unix_events.signal') 209 def test_remove_signal_handler_error(self, m_signal): 210 m_signal.NSIG = signal.NSIG 211 m_signal.valid_signals = signal.valid_signals 212 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 213 214 m_signal.signal.side_effect = OSError 215 216 self.assertRaises( 217 OSError, self.loop.remove_signal_handler, signal.SIGHUP) 218 219 @mock.patch('asyncio.unix_events.signal') 220 def test_remove_signal_handler_error2(self, m_signal): 221 m_signal.NSIG = signal.NSIG 222 m_signal.valid_signals = signal.valid_signals 223 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 224 225 class Err(OSError): 226 errno = errno.EINVAL 227 m_signal.signal.side_effect = Err 228 229 self.assertRaises( 230 RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP) 231 232 @mock.patch('asyncio.unix_events.signal') 233 def test_close(self, m_signal): 234 m_signal.NSIG = signal.NSIG 235 m_signal.valid_signals = signal.valid_signals 236 237 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 238 self.loop.add_signal_handler(signal.SIGCHLD, lambda: True) 239 240 self.assertEqual(len(self.loop._signal_handlers), 2) 241 242 m_signal.set_wakeup_fd.reset_mock() 243 244 self.loop.close() 245 246 self.assertEqual(len(self.loop._signal_handlers), 0) 247 m_signal.set_wakeup_fd.assert_called_once_with(-1) 248 249 @mock.patch('asyncio.unix_events.sys') 250 @mock.patch('asyncio.unix_events.signal') 251 def test_close_on_finalizing(self, m_signal, m_sys): 252 m_signal.NSIG = signal.NSIG 253 m_signal.valid_signals = signal.valid_signals 254 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 255 256 self.assertEqual(len(self.loop._signal_handlers), 1) 257 m_sys.is_finalizing.return_value = True 258 m_signal.signal.reset_mock() 259 260 with self.assertWarnsRegex(ResourceWarning, 261 "skipping signal handlers removal"): 262 self.loop.close() 263 264 self.assertEqual(len(self.loop._signal_handlers), 0) 265 self.assertFalse(m_signal.signal.called) 266 267 268@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 269 'UNIX Sockets are not supported') 270class SelectorEventLoopUnixSocketTests(test_utils.TestCase): 271 272 def setUp(self): 273 super().setUp() 274 self.loop = asyncio.SelectorEventLoop() 275 self.set_event_loop(self.loop) 276 277 @socket_helper.skip_unless_bind_unix_socket 278 def test_create_unix_server_existing_path_sock(self): 279 with test_utils.unix_socket_path() as path: 280 sock = socket.socket(socket.AF_UNIX) 281 sock.bind(path) 282 sock.listen(1) 283 sock.close() 284 285 coro = self.loop.create_unix_server(lambda: None, path) 286 srv = self.loop.run_until_complete(coro) 287 srv.close() 288 self.loop.run_until_complete(srv.wait_closed()) 289 290 @socket_helper.skip_unless_bind_unix_socket 291 def test_create_unix_server_pathlib(self): 292 with test_utils.unix_socket_path() as path: 293 path = pathlib.Path(path) 294 srv_coro = self.loop.create_unix_server(lambda: None, path) 295 srv = self.loop.run_until_complete(srv_coro) 296 srv.close() 297 self.loop.run_until_complete(srv.wait_closed()) 298 299 def test_create_unix_connection_pathlib(self): 300 with test_utils.unix_socket_path() as path: 301 path = pathlib.Path(path) 302 coro = self.loop.create_unix_connection(lambda: None, path) 303 with self.assertRaises(FileNotFoundError): 304 # If pathlib.Path wasn't supported, the exception would be 305 # different. 306 self.loop.run_until_complete(coro) 307 308 def test_create_unix_server_existing_path_nonsock(self): 309 with tempfile.NamedTemporaryFile() as file: 310 coro = self.loop.create_unix_server(lambda: None, file.name) 311 with self.assertRaisesRegex(OSError, 312 'Address.*is already in use'): 313 self.loop.run_until_complete(coro) 314 315 def test_create_unix_server_ssl_bool(self): 316 coro = self.loop.create_unix_server(lambda: None, path='spam', 317 ssl=True) 318 with self.assertRaisesRegex(TypeError, 319 'ssl argument must be an SSLContext'): 320 self.loop.run_until_complete(coro) 321 322 def test_create_unix_server_nopath_nosock(self): 323 coro = self.loop.create_unix_server(lambda: None, path=None) 324 with self.assertRaisesRegex(ValueError, 325 'path was not specified, and no sock'): 326 self.loop.run_until_complete(coro) 327 328 def test_create_unix_server_path_inetsock(self): 329 sock = socket.socket() 330 with sock: 331 coro = self.loop.create_unix_server(lambda: None, path=None, 332 sock=sock) 333 with self.assertRaisesRegex(ValueError, 334 'A UNIX Domain Stream.*was expected'): 335 self.loop.run_until_complete(coro) 336 337 def test_create_unix_server_path_dgram(self): 338 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) 339 with sock: 340 coro = self.loop.create_unix_server(lambda: None, path=None, 341 sock=sock) 342 with self.assertRaisesRegex(ValueError, 343 'A UNIX Domain Stream.*was expected'): 344 self.loop.run_until_complete(coro) 345 346 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 347 'no socket.SOCK_NONBLOCK (linux only)') 348 @socket_helper.skip_unless_bind_unix_socket 349 def test_create_unix_server_path_stream_bittype(self): 350 sock = socket.socket( 351 socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK) 352 with tempfile.NamedTemporaryFile() as file: 353 fn = file.name 354 try: 355 with sock: 356 sock.bind(fn) 357 coro = self.loop.create_unix_server(lambda: None, path=None, 358 sock=sock) 359 srv = self.loop.run_until_complete(coro) 360 srv.close() 361 self.loop.run_until_complete(srv.wait_closed()) 362 finally: 363 os.unlink(fn) 364 365 def test_create_unix_server_ssl_timeout_with_plain_sock(self): 366 coro = self.loop.create_unix_server(lambda: None, path='spam', 367 ssl_handshake_timeout=1) 368 with self.assertRaisesRegex( 369 ValueError, 370 'ssl_handshake_timeout is only meaningful with ssl'): 371 self.loop.run_until_complete(coro) 372 373 def test_create_unix_connection_path_inetsock(self): 374 sock = socket.socket() 375 with sock: 376 coro = self.loop.create_unix_connection(lambda: None, 377 sock=sock) 378 with self.assertRaisesRegex(ValueError, 379 'A UNIX Domain Stream.*was expected'): 380 self.loop.run_until_complete(coro) 381 382 @mock.patch('asyncio.unix_events.socket') 383 def test_create_unix_server_bind_error(self, m_socket): 384 # Ensure that the socket is closed on any bind error 385 sock = mock.Mock() 386 m_socket.socket.return_value = sock 387 388 sock.bind.side_effect = OSError 389 coro = self.loop.create_unix_server(lambda: None, path="/test") 390 with self.assertRaises(OSError): 391 self.loop.run_until_complete(coro) 392 self.assertTrue(sock.close.called) 393 394 sock.bind.side_effect = MemoryError 395 coro = self.loop.create_unix_server(lambda: None, path="/test") 396 with self.assertRaises(MemoryError): 397 self.loop.run_until_complete(coro) 398 self.assertTrue(sock.close.called) 399 400 def test_create_unix_connection_path_sock(self): 401 coro = self.loop.create_unix_connection( 402 lambda: None, os.devnull, sock=object()) 403 with self.assertRaisesRegex(ValueError, 'path and sock can not be'): 404 self.loop.run_until_complete(coro) 405 406 def test_create_unix_connection_nopath_nosock(self): 407 coro = self.loop.create_unix_connection( 408 lambda: None, None) 409 with self.assertRaisesRegex(ValueError, 410 'no path and sock were specified'): 411 self.loop.run_until_complete(coro) 412 413 def test_create_unix_connection_nossl_serverhost(self): 414 coro = self.loop.create_unix_connection( 415 lambda: None, os.devnull, server_hostname='spam') 416 with self.assertRaisesRegex(ValueError, 417 'server_hostname is only meaningful'): 418 self.loop.run_until_complete(coro) 419 420 def test_create_unix_connection_ssl_noserverhost(self): 421 coro = self.loop.create_unix_connection( 422 lambda: None, os.devnull, ssl=True) 423 424 with self.assertRaisesRegex( 425 ValueError, 'you have to pass server_hostname when using ssl'): 426 427 self.loop.run_until_complete(coro) 428 429 def test_create_unix_connection_ssl_timeout_with_plain_sock(self): 430 coro = self.loop.create_unix_connection(lambda: None, path='spam', 431 ssl_handshake_timeout=1) 432 with self.assertRaisesRegex( 433 ValueError, 434 'ssl_handshake_timeout is only meaningful with ssl'): 435 self.loop.run_until_complete(coro) 436 437 438@unittest.skipUnless(hasattr(os, 'sendfile'), 439 'sendfile is not supported') 440class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase): 441 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 442 443 class MyProto(asyncio.Protocol): 444 445 def __init__(self, loop): 446 self.started = False 447 self.closed = False 448 self.data = bytearray() 449 self.fut = loop.create_future() 450 self.transport = None 451 self._ready = loop.create_future() 452 453 def connection_made(self, transport): 454 self.started = True 455 self.transport = transport 456 self._ready.set_result(None) 457 458 def data_received(self, data): 459 self.data.extend(data) 460 461 def connection_lost(self, exc): 462 self.closed = True 463 self.fut.set_result(None) 464 465 async def wait_closed(self): 466 await self.fut 467 468 @classmethod 469 def setUpClass(cls): 470 with open(support.TESTFN, 'wb') as fp: 471 fp.write(cls.DATA) 472 super().setUpClass() 473 474 @classmethod 475 def tearDownClass(cls): 476 support.unlink(support.TESTFN) 477 super().tearDownClass() 478 479 def setUp(self): 480 self.loop = asyncio.new_event_loop() 481 self.set_event_loop(self.loop) 482 self.file = open(support.TESTFN, 'rb') 483 self.addCleanup(self.file.close) 484 super().setUp() 485 486 def make_socket(self, cleanup=True): 487 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 488 sock.setblocking(False) 489 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024) 490 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024) 491 if cleanup: 492 self.addCleanup(sock.close) 493 return sock 494 495 def run_loop(self, coro): 496 return self.loop.run_until_complete(coro) 497 498 def prepare(self): 499 sock = self.make_socket() 500 proto = self.MyProto(self.loop) 501 port = socket_helper.find_unused_port() 502 srv_sock = self.make_socket(cleanup=False) 503 srv_sock.bind((socket_helper.HOST, port)) 504 server = self.run_loop(self.loop.create_server( 505 lambda: proto, sock=srv_sock)) 506 self.run_loop(self.loop.sock_connect(sock, (socket_helper.HOST, port))) 507 self.run_loop(proto._ready) 508 509 def cleanup(): 510 proto.transport.close() 511 self.run_loop(proto.wait_closed()) 512 513 server.close() 514 self.run_loop(server.wait_closed()) 515 516 self.addCleanup(cleanup) 517 518 return sock, proto 519 520 def test_sock_sendfile_not_available(self): 521 sock, proto = self.prepare() 522 with mock.patch('asyncio.unix_events.os', spec=[]): 523 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 524 "os[.]sendfile[(][)] is not available"): 525 self.run_loop(self.loop._sock_sendfile_native(sock, self.file, 526 0, None)) 527 self.assertEqual(self.file.tell(), 0) 528 529 def test_sock_sendfile_not_a_file(self): 530 sock, proto = self.prepare() 531 f = object() 532 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 533 "not a regular file"): 534 self.run_loop(self.loop._sock_sendfile_native(sock, f, 535 0, None)) 536 self.assertEqual(self.file.tell(), 0) 537 538 def test_sock_sendfile_iobuffer(self): 539 sock, proto = self.prepare() 540 f = io.BytesIO() 541 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 542 "not a regular file"): 543 self.run_loop(self.loop._sock_sendfile_native(sock, f, 544 0, None)) 545 self.assertEqual(self.file.tell(), 0) 546 547 def test_sock_sendfile_not_regular_file(self): 548 sock, proto = self.prepare() 549 f = mock.Mock() 550 f.fileno.return_value = -1 551 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 552 "not a regular file"): 553 self.run_loop(self.loop._sock_sendfile_native(sock, f, 554 0, None)) 555 self.assertEqual(self.file.tell(), 0) 556 557 def test_sock_sendfile_cancel1(self): 558 sock, proto = self.prepare() 559 560 fut = self.loop.create_future() 561 fileno = self.file.fileno() 562 self.loop._sock_sendfile_native_impl(fut, None, sock, fileno, 563 0, None, len(self.DATA), 0) 564 fut.cancel() 565 with contextlib.suppress(asyncio.CancelledError): 566 self.run_loop(fut) 567 with self.assertRaises(KeyError): 568 self.loop._selector.get_key(sock) 569 570 def test_sock_sendfile_cancel2(self): 571 sock, proto = self.prepare() 572 573 fut = self.loop.create_future() 574 fileno = self.file.fileno() 575 self.loop._sock_sendfile_native_impl(fut, None, sock, fileno, 576 0, None, len(self.DATA), 0) 577 fut.cancel() 578 self.loop._sock_sendfile_native_impl(fut, sock.fileno(), sock, fileno, 579 0, None, len(self.DATA), 0) 580 with self.assertRaises(KeyError): 581 self.loop._selector.get_key(sock) 582 583 def test_sock_sendfile_blocking_error(self): 584 sock, proto = self.prepare() 585 586 fileno = self.file.fileno() 587 fut = mock.Mock() 588 fut.cancelled.return_value = False 589 with mock.patch('os.sendfile', side_effect=BlockingIOError()): 590 self.loop._sock_sendfile_native_impl(fut, None, sock, fileno, 591 0, None, len(self.DATA), 0) 592 key = self.loop._selector.get_key(sock) 593 self.assertIsNotNone(key) 594 fut.add_done_callback.assert_called_once_with(mock.ANY) 595 596 def test_sock_sendfile_os_error_first_call(self): 597 sock, proto = self.prepare() 598 599 fileno = self.file.fileno() 600 fut = self.loop.create_future() 601 with mock.patch('os.sendfile', side_effect=OSError()): 602 self.loop._sock_sendfile_native_impl(fut, None, sock, fileno, 603 0, None, len(self.DATA), 0) 604 with self.assertRaises(KeyError): 605 self.loop._selector.get_key(sock) 606 exc = fut.exception() 607 self.assertIsInstance(exc, asyncio.SendfileNotAvailableError) 608 self.assertEqual(0, self.file.tell()) 609 610 def test_sock_sendfile_os_error_next_call(self): 611 sock, proto = self.prepare() 612 613 fileno = self.file.fileno() 614 fut = self.loop.create_future() 615 err = OSError() 616 with mock.patch('os.sendfile', side_effect=err): 617 self.loop._sock_sendfile_native_impl(fut, sock.fileno(), 618 sock, fileno, 619 1000, None, len(self.DATA), 620 1000) 621 with self.assertRaises(KeyError): 622 self.loop._selector.get_key(sock) 623 exc = fut.exception() 624 self.assertIs(exc, err) 625 self.assertEqual(1000, self.file.tell()) 626 627 def test_sock_sendfile_exception(self): 628 sock, proto = self.prepare() 629 630 fileno = self.file.fileno() 631 fut = self.loop.create_future() 632 err = asyncio.SendfileNotAvailableError() 633 with mock.patch('os.sendfile', side_effect=err): 634 self.loop._sock_sendfile_native_impl(fut, sock.fileno(), 635 sock, fileno, 636 1000, None, len(self.DATA), 637 1000) 638 with self.assertRaises(KeyError): 639 self.loop._selector.get_key(sock) 640 exc = fut.exception() 641 self.assertIs(exc, err) 642 self.assertEqual(1000, self.file.tell()) 643 644 645class UnixReadPipeTransportTests(test_utils.TestCase): 646 647 def setUp(self): 648 super().setUp() 649 self.loop = self.new_test_loop() 650 self.protocol = test_utils.make_test_protocol(asyncio.Protocol) 651 self.pipe = mock.Mock(spec_set=io.RawIOBase) 652 self.pipe.fileno.return_value = 5 653 654 blocking_patcher = mock.patch('os.set_blocking') 655 blocking_patcher.start() 656 self.addCleanup(blocking_patcher.stop) 657 658 fstat_patcher = mock.patch('os.fstat') 659 m_fstat = fstat_patcher.start() 660 st = mock.Mock() 661 st.st_mode = stat.S_IFIFO 662 m_fstat.return_value = st 663 self.addCleanup(fstat_patcher.stop) 664 665 def read_pipe_transport(self, waiter=None): 666 transport = unix_events._UnixReadPipeTransport(self.loop, self.pipe, 667 self.protocol, 668 waiter=waiter) 669 self.addCleanup(close_pipe_transport, transport) 670 return transport 671 672 def test_ctor(self): 673 waiter = self.loop.create_future() 674 tr = self.read_pipe_transport(waiter=waiter) 675 self.loop.run_until_complete(waiter) 676 677 self.protocol.connection_made.assert_called_with(tr) 678 self.loop.assert_reader(5, tr._read_ready) 679 self.assertIsNone(waiter.result()) 680 681 @mock.patch('os.read') 682 def test__read_ready(self, m_read): 683 tr = self.read_pipe_transport() 684 m_read.return_value = b'data' 685 tr._read_ready() 686 687 m_read.assert_called_with(5, tr.max_size) 688 self.protocol.data_received.assert_called_with(b'data') 689 690 @mock.patch('os.read') 691 def test__read_ready_eof(self, m_read): 692 tr = self.read_pipe_transport() 693 m_read.return_value = b'' 694 tr._read_ready() 695 696 m_read.assert_called_with(5, tr.max_size) 697 self.assertFalse(self.loop.readers) 698 test_utils.run_briefly(self.loop) 699 self.protocol.eof_received.assert_called_with() 700 self.protocol.connection_lost.assert_called_with(None) 701 702 @mock.patch('os.read') 703 def test__read_ready_blocked(self, m_read): 704 tr = self.read_pipe_transport() 705 m_read.side_effect = BlockingIOError 706 tr._read_ready() 707 708 m_read.assert_called_with(5, tr.max_size) 709 test_utils.run_briefly(self.loop) 710 self.assertFalse(self.protocol.data_received.called) 711 712 @mock.patch('asyncio.log.logger.error') 713 @mock.patch('os.read') 714 def test__read_ready_error(self, m_read, m_logexc): 715 tr = self.read_pipe_transport() 716 err = OSError() 717 m_read.side_effect = err 718 tr._close = mock.Mock() 719 tr._read_ready() 720 721 m_read.assert_called_with(5, tr.max_size) 722 tr._close.assert_called_with(err) 723 m_logexc.assert_called_with( 724 test_utils.MockPattern( 725 'Fatal read error on pipe transport' 726 '\nprotocol:.*\ntransport:.*'), 727 exc_info=(OSError, MOCK_ANY, MOCK_ANY)) 728 729 @mock.patch('os.read') 730 def test_pause_reading(self, m_read): 731 tr = self.read_pipe_transport() 732 m = mock.Mock() 733 self.loop.add_reader(5, m) 734 tr.pause_reading() 735 self.assertFalse(self.loop.readers) 736 737 @mock.patch('os.read') 738 def test_resume_reading(self, m_read): 739 tr = self.read_pipe_transport() 740 tr.pause_reading() 741 tr.resume_reading() 742 self.loop.assert_reader(5, tr._read_ready) 743 744 @mock.patch('os.read') 745 def test_close(self, m_read): 746 tr = self.read_pipe_transport() 747 tr._close = mock.Mock() 748 tr.close() 749 tr._close.assert_called_with(None) 750 751 @mock.patch('os.read') 752 def test_close_already_closing(self, m_read): 753 tr = self.read_pipe_transport() 754 tr._closing = True 755 tr._close = mock.Mock() 756 tr.close() 757 self.assertFalse(tr._close.called) 758 759 @mock.patch('os.read') 760 def test__close(self, m_read): 761 tr = self.read_pipe_transport() 762 err = object() 763 tr._close(err) 764 self.assertTrue(tr.is_closing()) 765 self.assertFalse(self.loop.readers) 766 test_utils.run_briefly(self.loop) 767 self.protocol.connection_lost.assert_called_with(err) 768 769 def test__call_connection_lost(self): 770 tr = self.read_pipe_transport() 771 self.assertIsNotNone(tr._protocol) 772 self.assertIsNotNone(tr._loop) 773 774 err = None 775 tr._call_connection_lost(err) 776 self.protocol.connection_lost.assert_called_with(err) 777 self.pipe.close.assert_called_with() 778 779 self.assertIsNone(tr._protocol) 780 self.assertIsNone(tr._loop) 781 782 def test__call_connection_lost_with_err(self): 783 tr = self.read_pipe_transport() 784 self.assertIsNotNone(tr._protocol) 785 self.assertIsNotNone(tr._loop) 786 787 err = OSError() 788 tr._call_connection_lost(err) 789 self.protocol.connection_lost.assert_called_with(err) 790 self.pipe.close.assert_called_with() 791 792 self.assertIsNone(tr._protocol) 793 self.assertIsNone(tr._loop) 794 795 def test_pause_reading_on_closed_pipe(self): 796 tr = self.read_pipe_transport() 797 tr.close() 798 test_utils.run_briefly(self.loop) 799 self.assertIsNone(tr._loop) 800 tr.pause_reading() 801 802 def test_pause_reading_on_paused_pipe(self): 803 tr = self.read_pipe_transport() 804 tr.pause_reading() 805 # the second call should do nothing 806 tr.pause_reading() 807 808 def test_resume_reading_on_closed_pipe(self): 809 tr = self.read_pipe_transport() 810 tr.close() 811 test_utils.run_briefly(self.loop) 812 self.assertIsNone(tr._loop) 813 tr.resume_reading() 814 815 def test_resume_reading_on_paused_pipe(self): 816 tr = self.read_pipe_transport() 817 # the pipe is not paused 818 # resuming should do nothing 819 tr.resume_reading() 820 821 822class UnixWritePipeTransportTests(test_utils.TestCase): 823 824 def setUp(self): 825 super().setUp() 826 self.loop = self.new_test_loop() 827 self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol) 828 self.pipe = mock.Mock(spec_set=io.RawIOBase) 829 self.pipe.fileno.return_value = 5 830 831 blocking_patcher = mock.patch('os.set_blocking') 832 blocking_patcher.start() 833 self.addCleanup(blocking_patcher.stop) 834 835 fstat_patcher = mock.patch('os.fstat') 836 m_fstat = fstat_patcher.start() 837 st = mock.Mock() 838 st.st_mode = stat.S_IFSOCK 839 m_fstat.return_value = st 840 self.addCleanup(fstat_patcher.stop) 841 842 def write_pipe_transport(self, waiter=None): 843 transport = unix_events._UnixWritePipeTransport(self.loop, self.pipe, 844 self.protocol, 845 waiter=waiter) 846 self.addCleanup(close_pipe_transport, transport) 847 return transport 848 849 def test_ctor(self): 850 waiter = self.loop.create_future() 851 tr = self.write_pipe_transport(waiter=waiter) 852 self.loop.run_until_complete(waiter) 853 854 self.protocol.connection_made.assert_called_with(tr) 855 self.loop.assert_reader(5, tr._read_ready) 856 self.assertEqual(None, waiter.result()) 857 858 def test_can_write_eof(self): 859 tr = self.write_pipe_transport() 860 self.assertTrue(tr.can_write_eof()) 861 862 @mock.patch('os.write') 863 def test_write(self, m_write): 864 tr = self.write_pipe_transport() 865 m_write.return_value = 4 866 tr.write(b'data') 867 m_write.assert_called_with(5, b'data') 868 self.assertFalse(self.loop.writers) 869 self.assertEqual(bytearray(), tr._buffer) 870 871 @mock.patch('os.write') 872 def test_write_no_data(self, m_write): 873 tr = self.write_pipe_transport() 874 tr.write(b'') 875 self.assertFalse(m_write.called) 876 self.assertFalse(self.loop.writers) 877 self.assertEqual(bytearray(b''), tr._buffer) 878 879 @mock.patch('os.write') 880 def test_write_partial(self, m_write): 881 tr = self.write_pipe_transport() 882 m_write.return_value = 2 883 tr.write(b'data') 884 self.loop.assert_writer(5, tr._write_ready) 885 self.assertEqual(bytearray(b'ta'), tr._buffer) 886 887 @mock.patch('os.write') 888 def test_write_buffer(self, m_write): 889 tr = self.write_pipe_transport() 890 self.loop.add_writer(5, tr._write_ready) 891 tr._buffer = bytearray(b'previous') 892 tr.write(b'data') 893 self.assertFalse(m_write.called) 894 self.loop.assert_writer(5, tr._write_ready) 895 self.assertEqual(bytearray(b'previousdata'), tr._buffer) 896 897 @mock.patch('os.write') 898 def test_write_again(self, m_write): 899 tr = self.write_pipe_transport() 900 m_write.side_effect = BlockingIOError() 901 tr.write(b'data') 902 m_write.assert_called_with(5, bytearray(b'data')) 903 self.loop.assert_writer(5, tr._write_ready) 904 self.assertEqual(bytearray(b'data'), tr._buffer) 905 906 @mock.patch('asyncio.unix_events.logger') 907 @mock.patch('os.write') 908 def test_write_err(self, m_write, m_log): 909 tr = self.write_pipe_transport() 910 err = OSError() 911 m_write.side_effect = err 912 tr._fatal_error = mock.Mock() 913 tr.write(b'data') 914 m_write.assert_called_with(5, b'data') 915 self.assertFalse(self.loop.writers) 916 self.assertEqual(bytearray(), tr._buffer) 917 tr._fatal_error.assert_called_with( 918 err, 919 'Fatal write error on pipe transport') 920 self.assertEqual(1, tr._conn_lost) 921 922 tr.write(b'data') 923 self.assertEqual(2, tr._conn_lost) 924 tr.write(b'data') 925 tr.write(b'data') 926 tr.write(b'data') 927 tr.write(b'data') 928 # This is a bit overspecified. :-( 929 m_log.warning.assert_called_with( 930 'pipe closed by peer or os.write(pipe, data) raised exception.') 931 tr.close() 932 933 @mock.patch('os.write') 934 def test_write_close(self, m_write): 935 tr = self.write_pipe_transport() 936 tr._read_ready() # pipe was closed by peer 937 938 tr.write(b'data') 939 self.assertEqual(tr._conn_lost, 1) 940 tr.write(b'data') 941 self.assertEqual(tr._conn_lost, 2) 942 943 def test__read_ready(self): 944 tr = self.write_pipe_transport() 945 tr._read_ready() 946 self.assertFalse(self.loop.readers) 947 self.assertFalse(self.loop.writers) 948 self.assertTrue(tr.is_closing()) 949 test_utils.run_briefly(self.loop) 950 self.protocol.connection_lost.assert_called_with(None) 951 952 @mock.patch('os.write') 953 def test__write_ready(self, m_write): 954 tr = self.write_pipe_transport() 955 self.loop.add_writer(5, tr._write_ready) 956 tr._buffer = bytearray(b'data') 957 m_write.return_value = 4 958 tr._write_ready() 959 self.assertFalse(self.loop.writers) 960 self.assertEqual(bytearray(), tr._buffer) 961 962 @mock.patch('os.write') 963 def test__write_ready_partial(self, m_write): 964 tr = self.write_pipe_transport() 965 self.loop.add_writer(5, tr._write_ready) 966 tr._buffer = bytearray(b'data') 967 m_write.return_value = 3 968 tr._write_ready() 969 self.loop.assert_writer(5, tr._write_ready) 970 self.assertEqual(bytearray(b'a'), tr._buffer) 971 972 @mock.patch('os.write') 973 def test__write_ready_again(self, m_write): 974 tr = self.write_pipe_transport() 975 self.loop.add_writer(5, tr._write_ready) 976 tr._buffer = bytearray(b'data') 977 m_write.side_effect = BlockingIOError() 978 tr._write_ready() 979 m_write.assert_called_with(5, bytearray(b'data')) 980 self.loop.assert_writer(5, tr._write_ready) 981 self.assertEqual(bytearray(b'data'), tr._buffer) 982 983 @mock.patch('os.write') 984 def test__write_ready_empty(self, m_write): 985 tr = self.write_pipe_transport() 986 self.loop.add_writer(5, tr._write_ready) 987 tr._buffer = bytearray(b'data') 988 m_write.return_value = 0 989 tr._write_ready() 990 m_write.assert_called_with(5, bytearray(b'data')) 991 self.loop.assert_writer(5, tr._write_ready) 992 self.assertEqual(bytearray(b'data'), tr._buffer) 993 994 @mock.patch('asyncio.log.logger.error') 995 @mock.patch('os.write') 996 def test__write_ready_err(self, m_write, m_logexc): 997 tr = self.write_pipe_transport() 998 self.loop.add_writer(5, tr._write_ready) 999 tr._buffer = bytearray(b'data') 1000 m_write.side_effect = err = OSError() 1001 tr._write_ready() 1002 self.assertFalse(self.loop.writers) 1003 self.assertFalse(self.loop.readers) 1004 self.assertEqual(bytearray(), tr._buffer) 1005 self.assertTrue(tr.is_closing()) 1006 m_logexc.assert_not_called() 1007 self.assertEqual(1, tr._conn_lost) 1008 test_utils.run_briefly(self.loop) 1009 self.protocol.connection_lost.assert_called_with(err) 1010 1011 @mock.patch('os.write') 1012 def test__write_ready_closing(self, m_write): 1013 tr = self.write_pipe_transport() 1014 self.loop.add_writer(5, tr._write_ready) 1015 tr._closing = True 1016 tr._buffer = bytearray(b'data') 1017 m_write.return_value = 4 1018 tr._write_ready() 1019 self.assertFalse(self.loop.writers) 1020 self.assertFalse(self.loop.readers) 1021 self.assertEqual(bytearray(), tr._buffer) 1022 self.protocol.connection_lost.assert_called_with(None) 1023 self.pipe.close.assert_called_with() 1024 1025 @mock.patch('os.write') 1026 def test_abort(self, m_write): 1027 tr = self.write_pipe_transport() 1028 self.loop.add_writer(5, tr._write_ready) 1029 self.loop.add_reader(5, tr._read_ready) 1030 tr._buffer = [b'da', b'ta'] 1031 tr.abort() 1032 self.assertFalse(m_write.called) 1033 self.assertFalse(self.loop.readers) 1034 self.assertFalse(self.loop.writers) 1035 self.assertEqual([], tr._buffer) 1036 self.assertTrue(tr.is_closing()) 1037 test_utils.run_briefly(self.loop) 1038 self.protocol.connection_lost.assert_called_with(None) 1039 1040 def test__call_connection_lost(self): 1041 tr = self.write_pipe_transport() 1042 self.assertIsNotNone(tr._protocol) 1043 self.assertIsNotNone(tr._loop) 1044 1045 err = None 1046 tr._call_connection_lost(err) 1047 self.protocol.connection_lost.assert_called_with(err) 1048 self.pipe.close.assert_called_with() 1049 1050 self.assertIsNone(tr._protocol) 1051 self.assertIsNone(tr._loop) 1052 1053 def test__call_connection_lost_with_err(self): 1054 tr = self.write_pipe_transport() 1055 self.assertIsNotNone(tr._protocol) 1056 self.assertIsNotNone(tr._loop) 1057 1058 err = OSError() 1059 tr._call_connection_lost(err) 1060 self.protocol.connection_lost.assert_called_with(err) 1061 self.pipe.close.assert_called_with() 1062 1063 self.assertIsNone(tr._protocol) 1064 self.assertIsNone(tr._loop) 1065 1066 def test_close(self): 1067 tr = self.write_pipe_transport() 1068 tr.write_eof = mock.Mock() 1069 tr.close() 1070 tr.write_eof.assert_called_with() 1071 1072 # closing the transport twice must not fail 1073 tr.close() 1074 1075 def test_close_closing(self): 1076 tr = self.write_pipe_transport() 1077 tr.write_eof = mock.Mock() 1078 tr._closing = True 1079 tr.close() 1080 self.assertFalse(tr.write_eof.called) 1081 1082 def test_write_eof(self): 1083 tr = self.write_pipe_transport() 1084 tr.write_eof() 1085 self.assertTrue(tr.is_closing()) 1086 self.assertFalse(self.loop.readers) 1087 test_utils.run_briefly(self.loop) 1088 self.protocol.connection_lost.assert_called_with(None) 1089 1090 def test_write_eof_pending(self): 1091 tr = self.write_pipe_transport() 1092 tr._buffer = [b'data'] 1093 tr.write_eof() 1094 self.assertTrue(tr.is_closing()) 1095 self.assertFalse(self.protocol.connection_lost.called) 1096 1097 1098class AbstractChildWatcherTests(unittest.TestCase): 1099 1100 def test_not_implemented(self): 1101 f = mock.Mock() 1102 watcher = asyncio.AbstractChildWatcher() 1103 self.assertRaises( 1104 NotImplementedError, watcher.add_child_handler, f, f) 1105 self.assertRaises( 1106 NotImplementedError, watcher.remove_child_handler, f) 1107 self.assertRaises( 1108 NotImplementedError, watcher.attach_loop, f) 1109 self.assertRaises( 1110 NotImplementedError, watcher.close) 1111 self.assertRaises( 1112 NotImplementedError, watcher.is_active) 1113 self.assertRaises( 1114 NotImplementedError, watcher.__enter__) 1115 self.assertRaises( 1116 NotImplementedError, watcher.__exit__, f, f, f) 1117 1118 1119class BaseChildWatcherTests(unittest.TestCase): 1120 1121 def test_not_implemented(self): 1122 f = mock.Mock() 1123 watcher = unix_events.BaseChildWatcher() 1124 self.assertRaises( 1125 NotImplementedError, watcher._do_waitpid, f) 1126 1127 1128WaitPidMocks = collections.namedtuple("WaitPidMocks", 1129 ("waitpid", 1130 "WIFEXITED", 1131 "WIFSIGNALED", 1132 "WEXITSTATUS", 1133 "WTERMSIG", 1134 )) 1135 1136 1137class ChildWatcherTestsMixin: 1138 1139 ignore_warnings = mock.patch.object(log.logger, "warning") 1140 1141 def setUp(self): 1142 super().setUp() 1143 self.loop = self.new_test_loop() 1144 self.running = False 1145 self.zombies = {} 1146 1147 with mock.patch.object( 1148 self.loop, "add_signal_handler") as self.m_add_signal_handler: 1149 self.watcher = self.create_watcher() 1150 self.watcher.attach_loop(self.loop) 1151 1152 def waitpid(self, pid, flags): 1153 if isinstance(self.watcher, asyncio.SafeChildWatcher) or pid != -1: 1154 self.assertGreater(pid, 0) 1155 try: 1156 if pid < 0: 1157 return self.zombies.popitem() 1158 else: 1159 return pid, self.zombies.pop(pid) 1160 except KeyError: 1161 pass 1162 if self.running: 1163 return 0, 0 1164 else: 1165 raise ChildProcessError() 1166 1167 def add_zombie(self, pid, returncode): 1168 self.zombies[pid] = returncode + 32768 1169 1170 def WIFEXITED(self, status): 1171 return status >= 32768 1172 1173 def WIFSIGNALED(self, status): 1174 return 32700 < status < 32768 1175 1176 def WEXITSTATUS(self, status): 1177 self.assertTrue(self.WIFEXITED(status)) 1178 return status - 32768 1179 1180 def WTERMSIG(self, status): 1181 self.assertTrue(self.WIFSIGNALED(status)) 1182 return 32768 - status 1183 1184 def test_create_watcher(self): 1185 self.m_add_signal_handler.assert_called_once_with( 1186 signal.SIGCHLD, self.watcher._sig_chld) 1187 1188 def waitpid_mocks(func): 1189 def wrapped_func(self): 1190 def patch(target, wrapper): 1191 return mock.patch(target, wraps=wrapper, 1192 new_callable=mock.Mock) 1193 1194 with patch('os.WTERMSIG', self.WTERMSIG) as m_WTERMSIG, \ 1195 patch('os.WEXITSTATUS', self.WEXITSTATUS) as m_WEXITSTATUS, \ 1196 patch('os.WIFSIGNALED', self.WIFSIGNALED) as m_WIFSIGNALED, \ 1197 patch('os.WIFEXITED', self.WIFEXITED) as m_WIFEXITED, \ 1198 patch('os.waitpid', self.waitpid) as m_waitpid: 1199 func(self, WaitPidMocks(m_waitpid, 1200 m_WIFEXITED, m_WIFSIGNALED, 1201 m_WEXITSTATUS, m_WTERMSIG, 1202 )) 1203 return wrapped_func 1204 1205 @waitpid_mocks 1206 def test_sigchld(self, m): 1207 # register a child 1208 callback = mock.Mock() 1209 1210 with self.watcher: 1211 self.running = True 1212 self.watcher.add_child_handler(42, callback, 9, 10, 14) 1213 1214 self.assertFalse(callback.called) 1215 self.assertFalse(m.WIFEXITED.called) 1216 self.assertFalse(m.WIFSIGNALED.called) 1217 self.assertFalse(m.WEXITSTATUS.called) 1218 self.assertFalse(m.WTERMSIG.called) 1219 1220 # child is running 1221 self.watcher._sig_chld() 1222 1223 self.assertFalse(callback.called) 1224 self.assertFalse(m.WIFEXITED.called) 1225 self.assertFalse(m.WIFSIGNALED.called) 1226 self.assertFalse(m.WEXITSTATUS.called) 1227 self.assertFalse(m.WTERMSIG.called) 1228 1229 # child terminates (returncode 12) 1230 self.running = False 1231 self.add_zombie(42, 12) 1232 self.watcher._sig_chld() 1233 1234 self.assertTrue(m.WIFEXITED.called) 1235 self.assertTrue(m.WEXITSTATUS.called) 1236 self.assertFalse(m.WTERMSIG.called) 1237 callback.assert_called_once_with(42, 12, 9, 10, 14) 1238 1239 m.WIFSIGNALED.reset_mock() 1240 m.WIFEXITED.reset_mock() 1241 m.WEXITSTATUS.reset_mock() 1242 callback.reset_mock() 1243 1244 # ensure that the child is effectively reaped 1245 self.add_zombie(42, 13) 1246 with self.ignore_warnings: 1247 self.watcher._sig_chld() 1248 1249 self.assertFalse(callback.called) 1250 self.assertFalse(m.WTERMSIG.called) 1251 1252 m.WIFSIGNALED.reset_mock() 1253 m.WIFEXITED.reset_mock() 1254 m.WEXITSTATUS.reset_mock() 1255 1256 # sigchld called again 1257 self.zombies.clear() 1258 self.watcher._sig_chld() 1259 1260 self.assertFalse(callback.called) 1261 self.assertFalse(m.WIFEXITED.called) 1262 self.assertFalse(m.WIFSIGNALED.called) 1263 self.assertFalse(m.WEXITSTATUS.called) 1264 self.assertFalse(m.WTERMSIG.called) 1265 1266 @waitpid_mocks 1267 def test_sigchld_two_children(self, m): 1268 callback1 = mock.Mock() 1269 callback2 = mock.Mock() 1270 1271 # register child 1 1272 with self.watcher: 1273 self.running = True 1274 self.watcher.add_child_handler(43, callback1, 7, 8) 1275 1276 self.assertFalse(callback1.called) 1277 self.assertFalse(callback2.called) 1278 self.assertFalse(m.WIFEXITED.called) 1279 self.assertFalse(m.WIFSIGNALED.called) 1280 self.assertFalse(m.WEXITSTATUS.called) 1281 self.assertFalse(m.WTERMSIG.called) 1282 1283 # register child 2 1284 with self.watcher: 1285 self.watcher.add_child_handler(44, callback2, 147, 18) 1286 1287 self.assertFalse(callback1.called) 1288 self.assertFalse(callback2.called) 1289 self.assertFalse(m.WIFEXITED.called) 1290 self.assertFalse(m.WIFSIGNALED.called) 1291 self.assertFalse(m.WEXITSTATUS.called) 1292 self.assertFalse(m.WTERMSIG.called) 1293 1294 # children are running 1295 self.watcher._sig_chld() 1296 1297 self.assertFalse(callback1.called) 1298 self.assertFalse(callback2.called) 1299 self.assertFalse(m.WIFEXITED.called) 1300 self.assertFalse(m.WIFSIGNALED.called) 1301 self.assertFalse(m.WEXITSTATUS.called) 1302 self.assertFalse(m.WTERMSIG.called) 1303 1304 # child 1 terminates (signal 3) 1305 self.add_zombie(43, -3) 1306 self.watcher._sig_chld() 1307 1308 callback1.assert_called_once_with(43, -3, 7, 8) 1309 self.assertFalse(callback2.called) 1310 self.assertTrue(m.WIFSIGNALED.called) 1311 self.assertFalse(m.WEXITSTATUS.called) 1312 self.assertTrue(m.WTERMSIG.called) 1313 1314 m.WIFSIGNALED.reset_mock() 1315 m.WIFEXITED.reset_mock() 1316 m.WTERMSIG.reset_mock() 1317 callback1.reset_mock() 1318 1319 # child 2 still running 1320 self.watcher._sig_chld() 1321 1322 self.assertFalse(callback1.called) 1323 self.assertFalse(callback2.called) 1324 self.assertFalse(m.WIFEXITED.called) 1325 self.assertFalse(m.WIFSIGNALED.called) 1326 self.assertFalse(m.WEXITSTATUS.called) 1327 self.assertFalse(m.WTERMSIG.called) 1328 1329 # child 2 terminates (code 108) 1330 self.add_zombie(44, 108) 1331 self.running = False 1332 self.watcher._sig_chld() 1333 1334 callback2.assert_called_once_with(44, 108, 147, 18) 1335 self.assertFalse(callback1.called) 1336 self.assertTrue(m.WIFEXITED.called) 1337 self.assertTrue(m.WEXITSTATUS.called) 1338 self.assertFalse(m.WTERMSIG.called) 1339 1340 m.WIFSIGNALED.reset_mock() 1341 m.WIFEXITED.reset_mock() 1342 m.WEXITSTATUS.reset_mock() 1343 callback2.reset_mock() 1344 1345 # ensure that the children are effectively reaped 1346 self.add_zombie(43, 14) 1347 self.add_zombie(44, 15) 1348 with self.ignore_warnings: 1349 self.watcher._sig_chld() 1350 1351 self.assertFalse(callback1.called) 1352 self.assertFalse(callback2.called) 1353 self.assertFalse(m.WTERMSIG.called) 1354 1355 m.WIFSIGNALED.reset_mock() 1356 m.WIFEXITED.reset_mock() 1357 m.WEXITSTATUS.reset_mock() 1358 1359 # sigchld called again 1360 self.zombies.clear() 1361 self.watcher._sig_chld() 1362 1363 self.assertFalse(callback1.called) 1364 self.assertFalse(callback2.called) 1365 self.assertFalse(m.WIFEXITED.called) 1366 self.assertFalse(m.WIFSIGNALED.called) 1367 self.assertFalse(m.WEXITSTATUS.called) 1368 self.assertFalse(m.WTERMSIG.called) 1369 1370 @waitpid_mocks 1371 def test_sigchld_two_children_terminating_together(self, m): 1372 callback1 = mock.Mock() 1373 callback2 = mock.Mock() 1374 1375 # register child 1 1376 with self.watcher: 1377 self.running = True 1378 self.watcher.add_child_handler(45, callback1, 17, 8) 1379 1380 self.assertFalse(callback1.called) 1381 self.assertFalse(callback2.called) 1382 self.assertFalse(m.WIFEXITED.called) 1383 self.assertFalse(m.WIFSIGNALED.called) 1384 self.assertFalse(m.WEXITSTATUS.called) 1385 self.assertFalse(m.WTERMSIG.called) 1386 1387 # register child 2 1388 with self.watcher: 1389 self.watcher.add_child_handler(46, callback2, 1147, 18) 1390 1391 self.assertFalse(callback1.called) 1392 self.assertFalse(callback2.called) 1393 self.assertFalse(m.WIFEXITED.called) 1394 self.assertFalse(m.WIFSIGNALED.called) 1395 self.assertFalse(m.WEXITSTATUS.called) 1396 self.assertFalse(m.WTERMSIG.called) 1397 1398 # children are running 1399 self.watcher._sig_chld() 1400 1401 self.assertFalse(callback1.called) 1402 self.assertFalse(callback2.called) 1403 self.assertFalse(m.WIFEXITED.called) 1404 self.assertFalse(m.WIFSIGNALED.called) 1405 self.assertFalse(m.WEXITSTATUS.called) 1406 self.assertFalse(m.WTERMSIG.called) 1407 1408 # child 1 terminates (code 78) 1409 # child 2 terminates (signal 5) 1410 self.add_zombie(45, 78) 1411 self.add_zombie(46, -5) 1412 self.running = False 1413 self.watcher._sig_chld() 1414 1415 callback1.assert_called_once_with(45, 78, 17, 8) 1416 callback2.assert_called_once_with(46, -5, 1147, 18) 1417 self.assertTrue(m.WIFSIGNALED.called) 1418 self.assertTrue(m.WIFEXITED.called) 1419 self.assertTrue(m.WEXITSTATUS.called) 1420 self.assertTrue(m.WTERMSIG.called) 1421 1422 m.WIFSIGNALED.reset_mock() 1423 m.WIFEXITED.reset_mock() 1424 m.WTERMSIG.reset_mock() 1425 m.WEXITSTATUS.reset_mock() 1426 callback1.reset_mock() 1427 callback2.reset_mock() 1428 1429 # ensure that the children are effectively reaped 1430 self.add_zombie(45, 14) 1431 self.add_zombie(46, 15) 1432 with self.ignore_warnings: 1433 self.watcher._sig_chld() 1434 1435 self.assertFalse(callback1.called) 1436 self.assertFalse(callback2.called) 1437 self.assertFalse(m.WTERMSIG.called) 1438 1439 @waitpid_mocks 1440 def test_sigchld_race_condition(self, m): 1441 # register a child 1442 callback = mock.Mock() 1443 1444 with self.watcher: 1445 # child terminates before being registered 1446 self.add_zombie(50, 4) 1447 self.watcher._sig_chld() 1448 1449 self.watcher.add_child_handler(50, callback, 1, 12) 1450 1451 callback.assert_called_once_with(50, 4, 1, 12) 1452 callback.reset_mock() 1453 1454 # ensure that the child is effectively reaped 1455 self.add_zombie(50, -1) 1456 with self.ignore_warnings: 1457 self.watcher._sig_chld() 1458 1459 self.assertFalse(callback.called) 1460 1461 @waitpid_mocks 1462 def test_sigchld_replace_handler(self, m): 1463 callback1 = mock.Mock() 1464 callback2 = mock.Mock() 1465 1466 # register a child 1467 with self.watcher: 1468 self.running = True 1469 self.watcher.add_child_handler(51, callback1, 19) 1470 1471 self.assertFalse(callback1.called) 1472 self.assertFalse(callback2.called) 1473 self.assertFalse(m.WIFEXITED.called) 1474 self.assertFalse(m.WIFSIGNALED.called) 1475 self.assertFalse(m.WEXITSTATUS.called) 1476 self.assertFalse(m.WTERMSIG.called) 1477 1478 # register the same child again 1479 with self.watcher: 1480 self.watcher.add_child_handler(51, callback2, 21) 1481 1482 self.assertFalse(callback1.called) 1483 self.assertFalse(callback2.called) 1484 self.assertFalse(m.WIFEXITED.called) 1485 self.assertFalse(m.WIFSIGNALED.called) 1486 self.assertFalse(m.WEXITSTATUS.called) 1487 self.assertFalse(m.WTERMSIG.called) 1488 1489 # child terminates (signal 8) 1490 self.running = False 1491 self.add_zombie(51, -8) 1492 self.watcher._sig_chld() 1493 1494 callback2.assert_called_once_with(51, -8, 21) 1495 self.assertFalse(callback1.called) 1496 self.assertTrue(m.WIFSIGNALED.called) 1497 self.assertFalse(m.WEXITSTATUS.called) 1498 self.assertTrue(m.WTERMSIG.called) 1499 1500 m.WIFSIGNALED.reset_mock() 1501 m.WIFEXITED.reset_mock() 1502 m.WTERMSIG.reset_mock() 1503 callback2.reset_mock() 1504 1505 # ensure that the child is effectively reaped 1506 self.add_zombie(51, 13) 1507 with self.ignore_warnings: 1508 self.watcher._sig_chld() 1509 1510 self.assertFalse(callback1.called) 1511 self.assertFalse(callback2.called) 1512 self.assertFalse(m.WTERMSIG.called) 1513 1514 @waitpid_mocks 1515 def test_sigchld_remove_handler(self, m): 1516 callback = mock.Mock() 1517 1518 # register a child 1519 with self.watcher: 1520 self.running = True 1521 self.watcher.add_child_handler(52, callback, 1984) 1522 1523 self.assertFalse(callback.called) 1524 self.assertFalse(m.WIFEXITED.called) 1525 self.assertFalse(m.WIFSIGNALED.called) 1526 self.assertFalse(m.WEXITSTATUS.called) 1527 self.assertFalse(m.WTERMSIG.called) 1528 1529 # unregister the child 1530 self.watcher.remove_child_handler(52) 1531 1532 self.assertFalse(callback.called) 1533 self.assertFalse(m.WIFEXITED.called) 1534 self.assertFalse(m.WIFSIGNALED.called) 1535 self.assertFalse(m.WEXITSTATUS.called) 1536 self.assertFalse(m.WTERMSIG.called) 1537 1538 # child terminates (code 99) 1539 self.running = False 1540 self.add_zombie(52, 99) 1541 with self.ignore_warnings: 1542 self.watcher._sig_chld() 1543 1544 self.assertFalse(callback.called) 1545 1546 @waitpid_mocks 1547 def test_sigchld_unknown_status(self, m): 1548 callback = mock.Mock() 1549 1550 # register a child 1551 with self.watcher: 1552 self.running = True 1553 self.watcher.add_child_handler(53, callback, -19) 1554 1555 self.assertFalse(callback.called) 1556 self.assertFalse(m.WIFEXITED.called) 1557 self.assertFalse(m.WIFSIGNALED.called) 1558 self.assertFalse(m.WEXITSTATUS.called) 1559 self.assertFalse(m.WTERMSIG.called) 1560 1561 # terminate with unknown status 1562 self.zombies[53] = 1178 1563 self.running = False 1564 self.watcher._sig_chld() 1565 1566 callback.assert_called_once_with(53, 1178, -19) 1567 self.assertTrue(m.WIFEXITED.called) 1568 self.assertTrue(m.WIFSIGNALED.called) 1569 self.assertFalse(m.WEXITSTATUS.called) 1570 self.assertFalse(m.WTERMSIG.called) 1571 1572 callback.reset_mock() 1573 m.WIFEXITED.reset_mock() 1574 m.WIFSIGNALED.reset_mock() 1575 1576 # ensure that the child is effectively reaped 1577 self.add_zombie(53, 101) 1578 with self.ignore_warnings: 1579 self.watcher._sig_chld() 1580 1581 self.assertFalse(callback.called) 1582 1583 @waitpid_mocks 1584 def test_remove_child_handler(self, m): 1585 callback1 = mock.Mock() 1586 callback2 = mock.Mock() 1587 callback3 = mock.Mock() 1588 1589 # register children 1590 with self.watcher: 1591 self.running = True 1592 self.watcher.add_child_handler(54, callback1, 1) 1593 self.watcher.add_child_handler(55, callback2, 2) 1594 self.watcher.add_child_handler(56, callback3, 3) 1595 1596 # remove child handler 1 1597 self.assertTrue(self.watcher.remove_child_handler(54)) 1598 1599 # remove child handler 2 multiple times 1600 self.assertTrue(self.watcher.remove_child_handler(55)) 1601 self.assertFalse(self.watcher.remove_child_handler(55)) 1602 self.assertFalse(self.watcher.remove_child_handler(55)) 1603 1604 # all children terminate 1605 self.add_zombie(54, 0) 1606 self.add_zombie(55, 1) 1607 self.add_zombie(56, 2) 1608 self.running = False 1609 with self.ignore_warnings: 1610 self.watcher._sig_chld() 1611 1612 self.assertFalse(callback1.called) 1613 self.assertFalse(callback2.called) 1614 callback3.assert_called_once_with(56, 2, 3) 1615 1616 @waitpid_mocks 1617 def test_sigchld_unhandled_exception(self, m): 1618 callback = mock.Mock() 1619 1620 # register a child 1621 with self.watcher: 1622 self.running = True 1623 self.watcher.add_child_handler(57, callback) 1624 1625 # raise an exception 1626 m.waitpid.side_effect = ValueError 1627 1628 with mock.patch.object(log.logger, 1629 'error') as m_error: 1630 1631 self.assertEqual(self.watcher._sig_chld(), None) 1632 self.assertTrue(m_error.called) 1633 1634 @waitpid_mocks 1635 def test_sigchld_child_reaped_elsewhere(self, m): 1636 # register a child 1637 callback = mock.Mock() 1638 1639 with self.watcher: 1640 self.running = True 1641 self.watcher.add_child_handler(58, callback) 1642 1643 self.assertFalse(callback.called) 1644 self.assertFalse(m.WIFEXITED.called) 1645 self.assertFalse(m.WIFSIGNALED.called) 1646 self.assertFalse(m.WEXITSTATUS.called) 1647 self.assertFalse(m.WTERMSIG.called) 1648 1649 # child terminates 1650 self.running = False 1651 self.add_zombie(58, 4) 1652 1653 # waitpid is called elsewhere 1654 os.waitpid(58, os.WNOHANG) 1655 1656 m.waitpid.reset_mock() 1657 1658 # sigchld 1659 with self.ignore_warnings: 1660 self.watcher._sig_chld() 1661 1662 if isinstance(self.watcher, asyncio.FastChildWatcher): 1663 # here the FastChildWatche enters a deadlock 1664 # (there is no way to prevent it) 1665 self.assertFalse(callback.called) 1666 else: 1667 callback.assert_called_once_with(58, 255) 1668 1669 @waitpid_mocks 1670 def test_sigchld_unknown_pid_during_registration(self, m): 1671 # register two children 1672 callback1 = mock.Mock() 1673 callback2 = mock.Mock() 1674 1675 with self.ignore_warnings, self.watcher: 1676 self.running = True 1677 # child 1 terminates 1678 self.add_zombie(591, 7) 1679 # an unknown child terminates 1680 self.add_zombie(593, 17) 1681 1682 self.watcher._sig_chld() 1683 1684 self.watcher.add_child_handler(591, callback1) 1685 self.watcher.add_child_handler(592, callback2) 1686 1687 callback1.assert_called_once_with(591, 7) 1688 self.assertFalse(callback2.called) 1689 1690 @waitpid_mocks 1691 def test_set_loop(self, m): 1692 # register a child 1693 callback = mock.Mock() 1694 1695 with self.watcher: 1696 self.running = True 1697 self.watcher.add_child_handler(60, callback) 1698 1699 # attach a new loop 1700 old_loop = self.loop 1701 self.loop = self.new_test_loop() 1702 patch = mock.patch.object 1703 1704 with patch(old_loop, "remove_signal_handler") as m_old_remove, \ 1705 patch(self.loop, "add_signal_handler") as m_new_add: 1706 1707 self.watcher.attach_loop(self.loop) 1708 1709 m_old_remove.assert_called_once_with( 1710 signal.SIGCHLD) 1711 m_new_add.assert_called_once_with( 1712 signal.SIGCHLD, self.watcher._sig_chld) 1713 1714 # child terminates 1715 self.running = False 1716 self.add_zombie(60, 9) 1717 self.watcher._sig_chld() 1718 1719 callback.assert_called_once_with(60, 9) 1720 1721 @waitpid_mocks 1722 def test_set_loop_race_condition(self, m): 1723 # register 3 children 1724 callback1 = mock.Mock() 1725 callback2 = mock.Mock() 1726 callback3 = mock.Mock() 1727 1728 with self.watcher: 1729 self.running = True 1730 self.watcher.add_child_handler(61, callback1) 1731 self.watcher.add_child_handler(62, callback2) 1732 self.watcher.add_child_handler(622, callback3) 1733 1734 # detach the loop 1735 old_loop = self.loop 1736 self.loop = None 1737 1738 with mock.patch.object( 1739 old_loop, "remove_signal_handler") as m_remove_signal_handler: 1740 1741 with self.assertWarnsRegex( 1742 RuntimeWarning, 'A loop is being detached'): 1743 self.watcher.attach_loop(None) 1744 1745 m_remove_signal_handler.assert_called_once_with( 1746 signal.SIGCHLD) 1747 1748 # child 1 & 2 terminate 1749 self.add_zombie(61, 11) 1750 self.add_zombie(62, -5) 1751 1752 # SIGCHLD was not caught 1753 self.assertFalse(callback1.called) 1754 self.assertFalse(callback2.called) 1755 self.assertFalse(callback3.called) 1756 1757 # attach a new loop 1758 self.loop = self.new_test_loop() 1759 1760 with mock.patch.object( 1761 self.loop, "add_signal_handler") as m_add_signal_handler: 1762 1763 self.watcher.attach_loop(self.loop) 1764 1765 m_add_signal_handler.assert_called_once_with( 1766 signal.SIGCHLD, self.watcher._sig_chld) 1767 callback1.assert_called_once_with(61, 11) # race condition! 1768 callback2.assert_called_once_with(62, -5) # race condition! 1769 self.assertFalse(callback3.called) 1770 1771 callback1.reset_mock() 1772 callback2.reset_mock() 1773 1774 # child 3 terminates 1775 self.running = False 1776 self.add_zombie(622, 19) 1777 self.watcher._sig_chld() 1778 1779 self.assertFalse(callback1.called) 1780 self.assertFalse(callback2.called) 1781 callback3.assert_called_once_with(622, 19) 1782 1783 @waitpid_mocks 1784 def test_close(self, m): 1785 # register two children 1786 callback1 = mock.Mock() 1787 1788 with self.watcher: 1789 self.running = True 1790 # child 1 terminates 1791 self.add_zombie(63, 9) 1792 # other child terminates 1793 self.add_zombie(65, 18) 1794 self.watcher._sig_chld() 1795 1796 self.watcher.add_child_handler(63, callback1) 1797 self.watcher.add_child_handler(64, callback1) 1798 1799 self.assertEqual(len(self.watcher._callbacks), 1) 1800 if isinstance(self.watcher, asyncio.FastChildWatcher): 1801 self.assertEqual(len(self.watcher._zombies), 1) 1802 1803 with mock.patch.object( 1804 self.loop, 1805 "remove_signal_handler") as m_remove_signal_handler: 1806 1807 self.watcher.close() 1808 1809 m_remove_signal_handler.assert_called_once_with( 1810 signal.SIGCHLD) 1811 self.assertFalse(self.watcher._callbacks) 1812 if isinstance(self.watcher, asyncio.FastChildWatcher): 1813 self.assertFalse(self.watcher._zombies) 1814 1815 1816class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase): 1817 def create_watcher(self): 1818 return asyncio.SafeChildWatcher() 1819 1820 1821class FastChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase): 1822 def create_watcher(self): 1823 return asyncio.FastChildWatcher() 1824 1825 1826class PolicyTests(unittest.TestCase): 1827 1828 def create_policy(self): 1829 return asyncio.DefaultEventLoopPolicy() 1830 1831 def test_get_default_child_watcher(self): 1832 policy = self.create_policy() 1833 self.assertIsNone(policy._watcher) 1834 1835 watcher = policy.get_child_watcher() 1836 self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher) 1837 1838 self.assertIs(policy._watcher, watcher) 1839 1840 self.assertIs(watcher, policy.get_child_watcher()) 1841 1842 def test_get_child_watcher_after_set(self): 1843 policy = self.create_policy() 1844 watcher = asyncio.FastChildWatcher() 1845 1846 policy.set_child_watcher(watcher) 1847 self.assertIs(policy._watcher, watcher) 1848 self.assertIs(watcher, policy.get_child_watcher()) 1849 1850 def test_get_child_watcher_thread(self): 1851 1852 def f(): 1853 policy.set_event_loop(policy.new_event_loop()) 1854 1855 self.assertIsInstance(policy.get_event_loop(), 1856 asyncio.AbstractEventLoop) 1857 watcher = policy.get_child_watcher() 1858 1859 self.assertIsInstance(watcher, asyncio.SafeChildWatcher) 1860 self.assertIsNone(watcher._loop) 1861 1862 policy.get_event_loop().close() 1863 1864 policy = self.create_policy() 1865 policy.set_child_watcher(asyncio.SafeChildWatcher()) 1866 1867 th = threading.Thread(target=f) 1868 th.start() 1869 th.join() 1870 1871 def test_child_watcher_replace_mainloop_existing(self): 1872 policy = self.create_policy() 1873 loop = policy.get_event_loop() 1874 1875 # Explicitly setup SafeChildWatcher, 1876 # default ThreadedChildWatcher has no _loop property 1877 watcher = asyncio.SafeChildWatcher() 1878 policy.set_child_watcher(watcher) 1879 watcher.attach_loop(loop) 1880 1881 self.assertIs(watcher._loop, loop) 1882 1883 new_loop = policy.new_event_loop() 1884 policy.set_event_loop(new_loop) 1885 1886 self.assertIs(watcher._loop, new_loop) 1887 1888 policy.set_event_loop(None) 1889 1890 self.assertIs(watcher._loop, None) 1891 1892 loop.close() 1893 new_loop.close() 1894 1895 1896class TestFunctional(unittest.TestCase): 1897 1898 def setUp(self): 1899 self.loop = asyncio.new_event_loop() 1900 asyncio.set_event_loop(self.loop) 1901 1902 def tearDown(self): 1903 self.loop.close() 1904 asyncio.set_event_loop(None) 1905 1906 def test_add_reader_invalid_argument(self): 1907 def assert_raises(): 1908 return self.assertRaisesRegex(ValueError, r'Invalid file object') 1909 1910 cb = lambda: None 1911 1912 with assert_raises(): 1913 self.loop.add_reader(object(), cb) 1914 with assert_raises(): 1915 self.loop.add_writer(object(), cb) 1916 1917 with assert_raises(): 1918 self.loop.remove_reader(object()) 1919 with assert_raises(): 1920 self.loop.remove_writer(object()) 1921 1922 def test_add_reader_or_writer_transport_fd(self): 1923 def assert_raises(): 1924 return self.assertRaisesRegex( 1925 RuntimeError, 1926 r'File descriptor .* is used by transport') 1927 1928 async def runner(): 1929 tr, pr = await self.loop.create_connection( 1930 lambda: asyncio.Protocol(), sock=rsock) 1931 1932 try: 1933 cb = lambda: None 1934 1935 with assert_raises(): 1936 self.loop.add_reader(rsock, cb) 1937 with assert_raises(): 1938 self.loop.add_reader(rsock.fileno(), cb) 1939 1940 with assert_raises(): 1941 self.loop.remove_reader(rsock) 1942 with assert_raises(): 1943 self.loop.remove_reader(rsock.fileno()) 1944 1945 with assert_raises(): 1946 self.loop.add_writer(rsock, cb) 1947 with assert_raises(): 1948 self.loop.add_writer(rsock.fileno(), cb) 1949 1950 with assert_raises(): 1951 self.loop.remove_writer(rsock) 1952 with assert_raises(): 1953 self.loop.remove_writer(rsock.fileno()) 1954 1955 finally: 1956 tr.close() 1957 1958 rsock, wsock = socket.socketpair() 1959 try: 1960 self.loop.run_until_complete(runner()) 1961 finally: 1962 rsock.close() 1963 wsock.close() 1964 1965 1966if __name__ == '__main__': 1967 unittest.main() 1968