1"""Tests for unix_events.py.""" 2 3import collections 4import contextlib 5import errno 6import io 7import os 8import pathlib 9import signal 10import socket 11import stat 12import sys 13import tempfile 14import threading 15import unittest 16from unittest import mock 17from test import support 18 19if sys.platform == 'win32': 20 raise unittest.SkipTest('UNIX only') 21 22 23import asyncio 24from asyncio import log 25from asyncio import unix_events 26from test.test_asyncio import utils as test_utils 27 28 29MOCK_ANY = mock.ANY 30 31 32def tearDownModule(): 33 asyncio.set_event_loop_policy(None) 34 35 36def close_pipe_transport(transport): 37 # Don't call transport.close() because the event loop and the selector 38 # are mocked 39 if transport._pipe is None: 40 return 41 transport._pipe.close() 42 transport._pipe = None 43 44 45@unittest.skipUnless(signal, 'Signals are not supported') 46class SelectorEventLoopSignalTests(test_utils.TestCase): 47 48 def setUp(self): 49 super().setUp() 50 self.loop = asyncio.SelectorEventLoop() 51 self.set_event_loop(self.loop) 52 53 def test_check_signal(self): 54 self.assertRaises( 55 TypeError, self.loop._check_signal, '1') 56 self.assertRaises( 57 ValueError, self.loop._check_signal, signal.NSIG + 1) 58 59 def test_handle_signal_no_handler(self): 60 self.loop._handle_signal(signal.NSIG + 1) 61 62 def test_handle_signal_cancelled_handler(self): 63 h = asyncio.Handle(mock.Mock(), (), 64 loop=mock.Mock()) 65 h.cancel() 66 self.loop._signal_handlers[signal.NSIG + 1] = h 67 self.loop.remove_signal_handler = mock.Mock() 68 self.loop._handle_signal(signal.NSIG + 1) 69 self.loop.remove_signal_handler.assert_called_with(signal.NSIG + 1) 70 71 @mock.patch('asyncio.unix_events.signal') 72 def test_add_signal_handler_setup_error(self, m_signal): 73 m_signal.NSIG = signal.NSIG 74 m_signal.valid_signals = signal.valid_signals 75 m_signal.set_wakeup_fd.side_effect = ValueError 76 77 self.assertRaises( 78 RuntimeError, 79 self.loop.add_signal_handler, 80 signal.SIGINT, lambda: True) 81 82 @mock.patch('asyncio.unix_events.signal') 83 def test_add_signal_handler_coroutine_error(self, m_signal): 84 m_signal.NSIG = signal.NSIG 85 86 async def simple_coroutine(): 87 pass 88 89 # callback must not be a coroutine function 90 coro_func = simple_coroutine 91 coro_obj = coro_func() 92 self.addCleanup(coro_obj.close) 93 for func in (coro_func, coro_obj): 94 self.assertRaisesRegex( 95 TypeError, 'coroutines cannot be used with add_signal_handler', 96 self.loop.add_signal_handler, 97 signal.SIGINT, func) 98 99 @mock.patch('asyncio.unix_events.signal') 100 def test_add_signal_handler(self, m_signal): 101 m_signal.NSIG = signal.NSIG 102 m_signal.valid_signals = signal.valid_signals 103 104 cb = lambda: True 105 self.loop.add_signal_handler(signal.SIGHUP, cb) 106 h = self.loop._signal_handlers.get(signal.SIGHUP) 107 self.assertIsInstance(h, asyncio.Handle) 108 self.assertEqual(h._callback, cb) 109 110 @mock.patch('asyncio.unix_events.signal') 111 def test_add_signal_handler_install_error(self, m_signal): 112 m_signal.NSIG = signal.NSIG 113 m_signal.valid_signals = signal.valid_signals 114 115 def set_wakeup_fd(fd): 116 if fd == -1: 117 raise ValueError() 118 m_signal.set_wakeup_fd = set_wakeup_fd 119 120 class Err(OSError): 121 errno = errno.EFAULT 122 m_signal.signal.side_effect = Err 123 124 self.assertRaises( 125 Err, 126 self.loop.add_signal_handler, 127 signal.SIGINT, lambda: True) 128 129 @mock.patch('asyncio.unix_events.signal') 130 @mock.patch('asyncio.base_events.logger') 131 def test_add_signal_handler_install_error2(self, m_logging, m_signal): 132 m_signal.NSIG = signal.NSIG 133 m_signal.valid_signals = signal.valid_signals 134 135 class Err(OSError): 136 errno = errno.EINVAL 137 m_signal.signal.side_effect = Err 138 139 self.loop._signal_handlers[signal.SIGHUP] = lambda: True 140 self.assertRaises( 141 RuntimeError, 142 self.loop.add_signal_handler, 143 signal.SIGINT, lambda: True) 144 self.assertFalse(m_logging.info.called) 145 self.assertEqual(1, m_signal.set_wakeup_fd.call_count) 146 147 @mock.patch('asyncio.unix_events.signal') 148 @mock.patch('asyncio.base_events.logger') 149 def test_add_signal_handler_install_error3(self, m_logging, m_signal): 150 class Err(OSError): 151 errno = errno.EINVAL 152 m_signal.signal.side_effect = Err 153 m_signal.NSIG = signal.NSIG 154 m_signal.valid_signals = signal.valid_signals 155 156 self.assertRaises( 157 RuntimeError, 158 self.loop.add_signal_handler, 159 signal.SIGINT, lambda: True) 160 self.assertFalse(m_logging.info.called) 161 self.assertEqual(2, m_signal.set_wakeup_fd.call_count) 162 163 @mock.patch('asyncio.unix_events.signal') 164 def test_remove_signal_handler(self, m_signal): 165 m_signal.NSIG = signal.NSIG 166 m_signal.valid_signals = signal.valid_signals 167 168 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 169 170 self.assertTrue( 171 self.loop.remove_signal_handler(signal.SIGHUP)) 172 self.assertTrue(m_signal.set_wakeup_fd.called) 173 self.assertTrue(m_signal.signal.called) 174 self.assertEqual( 175 (signal.SIGHUP, m_signal.SIG_DFL), m_signal.signal.call_args[0]) 176 177 @mock.patch('asyncio.unix_events.signal') 178 def test_remove_signal_handler_2(self, m_signal): 179 m_signal.NSIG = signal.NSIG 180 m_signal.SIGINT = signal.SIGINT 181 m_signal.valid_signals = signal.valid_signals 182 183 self.loop.add_signal_handler(signal.SIGINT, lambda: True) 184 self.loop._signal_handlers[signal.SIGHUP] = object() 185 m_signal.set_wakeup_fd.reset_mock() 186 187 self.assertTrue( 188 self.loop.remove_signal_handler(signal.SIGINT)) 189 self.assertFalse(m_signal.set_wakeup_fd.called) 190 self.assertTrue(m_signal.signal.called) 191 self.assertEqual( 192 (signal.SIGINT, m_signal.default_int_handler), 193 m_signal.signal.call_args[0]) 194 195 @mock.patch('asyncio.unix_events.signal') 196 @mock.patch('asyncio.base_events.logger') 197 def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal): 198 m_signal.NSIG = signal.NSIG 199 m_signal.valid_signals = signal.valid_signals 200 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 201 202 m_signal.set_wakeup_fd.side_effect = ValueError 203 204 self.loop.remove_signal_handler(signal.SIGHUP) 205 self.assertTrue(m_logging.info) 206 207 @mock.patch('asyncio.unix_events.signal') 208 def test_remove_signal_handler_error(self, m_signal): 209 m_signal.NSIG = signal.NSIG 210 m_signal.valid_signals = signal.valid_signals 211 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 212 213 m_signal.signal.side_effect = OSError 214 215 self.assertRaises( 216 OSError, self.loop.remove_signal_handler, signal.SIGHUP) 217 218 @mock.patch('asyncio.unix_events.signal') 219 def test_remove_signal_handler_error2(self, m_signal): 220 m_signal.NSIG = signal.NSIG 221 m_signal.valid_signals = signal.valid_signals 222 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 223 224 class Err(OSError): 225 errno = errno.EINVAL 226 m_signal.signal.side_effect = Err 227 228 self.assertRaises( 229 RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP) 230 231 @mock.patch('asyncio.unix_events.signal') 232 def test_close(self, m_signal): 233 m_signal.NSIG = signal.NSIG 234 m_signal.valid_signals = signal.valid_signals 235 236 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 237 self.loop.add_signal_handler(signal.SIGCHLD, lambda: True) 238 239 self.assertEqual(len(self.loop._signal_handlers), 2) 240 241 m_signal.set_wakeup_fd.reset_mock() 242 243 self.loop.close() 244 245 self.assertEqual(len(self.loop._signal_handlers), 0) 246 m_signal.set_wakeup_fd.assert_called_once_with(-1) 247 248 @mock.patch('asyncio.unix_events.sys') 249 @mock.patch('asyncio.unix_events.signal') 250 def test_close_on_finalizing(self, m_signal, m_sys): 251 m_signal.NSIG = signal.NSIG 252 m_signal.valid_signals = signal.valid_signals 253 self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 254 255 self.assertEqual(len(self.loop._signal_handlers), 1) 256 m_sys.is_finalizing.return_value = True 257 m_signal.signal.reset_mock() 258 259 with self.assertWarnsRegex(ResourceWarning, 260 "skipping signal handlers removal"): 261 self.loop.close() 262 263 self.assertEqual(len(self.loop._signal_handlers), 0) 264 self.assertFalse(m_signal.signal.called) 265 266 267@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 268 'UNIX Sockets are not supported') 269class SelectorEventLoopUnixSocketTests(test_utils.TestCase): 270 271 def setUp(self): 272 super().setUp() 273 self.loop = asyncio.SelectorEventLoop() 274 self.set_event_loop(self.loop) 275 276 @support.skip_unless_bind_unix_socket 277 def test_create_unix_server_existing_path_sock(self): 278 with test_utils.unix_socket_path() as path: 279 sock = socket.socket(socket.AF_UNIX) 280 sock.bind(path) 281 sock.listen(1) 282 sock.close() 283 284 coro = self.loop.create_unix_server(lambda: None, path) 285 srv = self.loop.run_until_complete(coro) 286 srv.close() 287 self.loop.run_until_complete(srv.wait_closed()) 288 289 @support.skip_unless_bind_unix_socket 290 def test_create_unix_server_pathlib(self): 291 with test_utils.unix_socket_path() as path: 292 path = pathlib.Path(path) 293 srv_coro = self.loop.create_unix_server(lambda: None, path) 294 srv = self.loop.run_until_complete(srv_coro) 295 srv.close() 296 self.loop.run_until_complete(srv.wait_closed()) 297 298 def test_create_unix_connection_pathlib(self): 299 with test_utils.unix_socket_path() as path: 300 path = pathlib.Path(path) 301 coro = self.loop.create_unix_connection(lambda: None, path) 302 with self.assertRaises(FileNotFoundError): 303 # If pathlib.Path wasn't supported, the exception would be 304 # different. 305 self.loop.run_until_complete(coro) 306 307 def test_create_unix_server_existing_path_nonsock(self): 308 with tempfile.NamedTemporaryFile() as file: 309 coro = self.loop.create_unix_server(lambda: None, file.name) 310 with self.assertRaisesRegex(OSError, 311 'Address.*is already in use'): 312 self.loop.run_until_complete(coro) 313 314 def test_create_unix_server_ssl_bool(self): 315 coro = self.loop.create_unix_server(lambda: None, path='spam', 316 ssl=True) 317 with self.assertRaisesRegex(TypeError, 318 'ssl argument must be an SSLContext'): 319 self.loop.run_until_complete(coro) 320 321 def test_create_unix_server_nopath_nosock(self): 322 coro = self.loop.create_unix_server(lambda: None, path=None) 323 with self.assertRaisesRegex(ValueError, 324 'path was not specified, and no sock'): 325 self.loop.run_until_complete(coro) 326 327 def test_create_unix_server_path_inetsock(self): 328 sock = socket.socket() 329 with sock: 330 coro = self.loop.create_unix_server(lambda: None, path=None, 331 sock=sock) 332 with self.assertRaisesRegex(ValueError, 333 'A UNIX Domain Stream.*was expected'): 334 self.loop.run_until_complete(coro) 335 336 def test_create_unix_server_path_dgram(self): 337 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) 338 with sock: 339 coro = self.loop.create_unix_server(lambda: None, path=None, 340 sock=sock) 341 with self.assertRaisesRegex(ValueError, 342 'A UNIX Domain Stream.*was expected'): 343 self.loop.run_until_complete(coro) 344 345 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 346 'no socket.SOCK_NONBLOCK (linux only)') 347 @support.skip_unless_bind_unix_socket 348 def test_create_unix_server_path_stream_bittype(self): 349 sock = socket.socket( 350 socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK) 351 with tempfile.NamedTemporaryFile() as file: 352 fn = file.name 353 try: 354 with sock: 355 sock.bind(fn) 356 coro = self.loop.create_unix_server(lambda: None, path=None, 357 sock=sock) 358 srv = self.loop.run_until_complete(coro) 359 srv.close() 360 self.loop.run_until_complete(srv.wait_closed()) 361 finally: 362 os.unlink(fn) 363 364 def test_create_unix_server_ssl_timeout_with_plain_sock(self): 365 coro = self.loop.create_unix_server(lambda: None, path='spam', 366 ssl_handshake_timeout=1) 367 with self.assertRaisesRegex( 368 ValueError, 369 'ssl_handshake_timeout is only meaningful with ssl'): 370 self.loop.run_until_complete(coro) 371 372 def test_create_unix_connection_path_inetsock(self): 373 sock = socket.socket() 374 with sock: 375 coro = self.loop.create_unix_connection(lambda: None, 376 sock=sock) 377 with self.assertRaisesRegex(ValueError, 378 'A UNIX Domain Stream.*was expected'): 379 self.loop.run_until_complete(coro) 380 381 @mock.patch('asyncio.unix_events.socket') 382 def test_create_unix_server_bind_error(self, m_socket): 383 # Ensure that the socket is closed on any bind error 384 sock = mock.Mock() 385 m_socket.socket.return_value = sock 386 387 sock.bind.side_effect = OSError 388 coro = self.loop.create_unix_server(lambda: None, path="/test") 389 with self.assertRaises(OSError): 390 self.loop.run_until_complete(coro) 391 self.assertTrue(sock.close.called) 392 393 sock.bind.side_effect = MemoryError 394 coro = self.loop.create_unix_server(lambda: None, path="/test") 395 with self.assertRaises(MemoryError): 396 self.loop.run_until_complete(coro) 397 self.assertTrue(sock.close.called) 398 399 def test_create_unix_connection_path_sock(self): 400 coro = self.loop.create_unix_connection( 401 lambda: None, os.devnull, sock=object()) 402 with self.assertRaisesRegex(ValueError, 'path and sock can not be'): 403 self.loop.run_until_complete(coro) 404 405 def test_create_unix_connection_nopath_nosock(self): 406 coro = self.loop.create_unix_connection( 407 lambda: None, None) 408 with self.assertRaisesRegex(ValueError, 409 'no path and sock were specified'): 410 self.loop.run_until_complete(coro) 411 412 def test_create_unix_connection_nossl_serverhost(self): 413 coro = self.loop.create_unix_connection( 414 lambda: None, os.devnull, server_hostname='spam') 415 with self.assertRaisesRegex(ValueError, 416 'server_hostname is only meaningful'): 417 self.loop.run_until_complete(coro) 418 419 def test_create_unix_connection_ssl_noserverhost(self): 420 coro = self.loop.create_unix_connection( 421 lambda: None, os.devnull, ssl=True) 422 423 with self.assertRaisesRegex( 424 ValueError, 'you have to pass server_hostname when using ssl'): 425 426 self.loop.run_until_complete(coro) 427 428 def test_create_unix_connection_ssl_timeout_with_plain_sock(self): 429 coro = self.loop.create_unix_connection(lambda: None, path='spam', 430 ssl_handshake_timeout=1) 431 with self.assertRaisesRegex( 432 ValueError, 433 'ssl_handshake_timeout is only meaningful with ssl'): 434 self.loop.run_until_complete(coro) 435 436 437@unittest.skipUnless(hasattr(os, 'sendfile'), 438 'sendfile is not supported') 439class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase): 440 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 441 442 class MyProto(asyncio.Protocol): 443 444 def __init__(self, loop): 445 self.started = False 446 self.closed = False 447 self.data = bytearray() 448 self.fut = loop.create_future() 449 self.transport = None 450 self._ready = loop.create_future() 451 452 def connection_made(self, transport): 453 self.started = True 454 self.transport = transport 455 self._ready.set_result(None) 456 457 def data_received(self, data): 458 self.data.extend(data) 459 460 def connection_lost(self, exc): 461 self.closed = True 462 self.fut.set_result(None) 463 464 async def wait_closed(self): 465 await self.fut 466 467 @classmethod 468 def setUpClass(cls): 469 with open(support.TESTFN, 'wb') as fp: 470 fp.write(cls.DATA) 471 super().setUpClass() 472 473 @classmethod 474 def tearDownClass(cls): 475 support.unlink(support.TESTFN) 476 super().tearDownClass() 477 478 def setUp(self): 479 self.loop = asyncio.new_event_loop() 480 self.set_event_loop(self.loop) 481 self.file = open(support.TESTFN, 'rb') 482 self.addCleanup(self.file.close) 483 super().setUp() 484 485 def make_socket(self, cleanup=True): 486 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 487 sock.setblocking(False) 488 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024) 489 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024) 490 if cleanup: 491 self.addCleanup(sock.close) 492 return sock 493 494 def run_loop(self, coro): 495 return self.loop.run_until_complete(coro) 496 497 def prepare(self): 498 sock = self.make_socket() 499 proto = self.MyProto(self.loop) 500 port = support.find_unused_port() 501 srv_sock = self.make_socket(cleanup=False) 502 srv_sock.bind((support.HOST, port)) 503 server = self.run_loop(self.loop.create_server( 504 lambda: proto, sock=srv_sock)) 505 self.run_loop(self.loop.sock_connect(sock, (support.HOST, port))) 506 self.run_loop(proto._ready) 507 508 def cleanup(): 509 proto.transport.close() 510 self.run_loop(proto.wait_closed()) 511 512 server.close() 513 self.run_loop(server.wait_closed()) 514 515 self.addCleanup(cleanup) 516 517 return sock, proto 518 519 def test_sock_sendfile_not_available(self): 520 sock, proto = self.prepare() 521 with mock.patch('asyncio.unix_events.os', spec=[]): 522 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 523 "os[.]sendfile[(][)] is not available"): 524 self.run_loop(self.loop._sock_sendfile_native(sock, self.file, 525 0, None)) 526 self.assertEqual(self.file.tell(), 0) 527 528 def test_sock_sendfile_not_a_file(self): 529 sock, proto = self.prepare() 530 f = object() 531 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 532 "not a regular file"): 533 self.run_loop(self.loop._sock_sendfile_native(sock, f, 534 0, None)) 535 self.assertEqual(self.file.tell(), 0) 536 537 def test_sock_sendfile_iobuffer(self): 538 sock, proto = self.prepare() 539 f = io.BytesIO() 540 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 541 "not a regular file"): 542 self.run_loop(self.loop._sock_sendfile_native(sock, f, 543 0, None)) 544 self.assertEqual(self.file.tell(), 0) 545 546 def test_sock_sendfile_not_regular_file(self): 547 sock, proto = self.prepare() 548 f = mock.Mock() 549 f.fileno.return_value = -1 550 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 551 "not a regular file"): 552 self.run_loop(self.loop._sock_sendfile_native(sock, f, 553 0, None)) 554 self.assertEqual(self.file.tell(), 0) 555 556 def test_sock_sendfile_cancel1(self): 557 sock, proto = self.prepare() 558 559 fut = self.loop.create_future() 560 fileno = self.file.fileno() 561 self.loop._sock_sendfile_native_impl(fut, None, sock, fileno, 562 0, None, len(self.DATA), 0) 563 fut.cancel() 564 with contextlib.suppress(asyncio.CancelledError): 565 self.run_loop(fut) 566 with self.assertRaises(KeyError): 567 self.loop._selector.get_key(sock) 568 569 def test_sock_sendfile_cancel2(self): 570 sock, proto = self.prepare() 571 572 fut = self.loop.create_future() 573 fileno = self.file.fileno() 574 self.loop._sock_sendfile_native_impl(fut, None, sock, fileno, 575 0, None, len(self.DATA), 0) 576 fut.cancel() 577 self.loop._sock_sendfile_native_impl(fut, sock.fileno(), sock, fileno, 578 0, None, len(self.DATA), 0) 579 with self.assertRaises(KeyError): 580 self.loop._selector.get_key(sock) 581 582 def test_sock_sendfile_blocking_error(self): 583 sock, proto = self.prepare() 584 585 fileno = self.file.fileno() 586 fut = mock.Mock() 587 fut.cancelled.return_value = False 588 with mock.patch('os.sendfile', side_effect=BlockingIOError()): 589 self.loop._sock_sendfile_native_impl(fut, None, sock, fileno, 590 0, None, len(self.DATA), 0) 591 key = self.loop._selector.get_key(sock) 592 self.assertIsNotNone(key) 593 fut.add_done_callback.assert_called_once_with(mock.ANY) 594 595 def test_sock_sendfile_os_error_first_call(self): 596 sock, proto = self.prepare() 597 598 fileno = self.file.fileno() 599 fut = self.loop.create_future() 600 with mock.patch('os.sendfile', side_effect=OSError()): 601 self.loop._sock_sendfile_native_impl(fut, None, sock, fileno, 602 0, None, len(self.DATA), 0) 603 with self.assertRaises(KeyError): 604 self.loop._selector.get_key(sock) 605 exc = fut.exception() 606 self.assertIsInstance(exc, asyncio.SendfileNotAvailableError) 607 self.assertEqual(0, self.file.tell()) 608 609 def test_sock_sendfile_os_error_next_call(self): 610 sock, proto = self.prepare() 611 612 fileno = self.file.fileno() 613 fut = self.loop.create_future() 614 err = OSError() 615 with mock.patch('os.sendfile', side_effect=err): 616 self.loop._sock_sendfile_native_impl(fut, sock.fileno(), 617 sock, fileno, 618 1000, None, len(self.DATA), 619 1000) 620 with self.assertRaises(KeyError): 621 self.loop._selector.get_key(sock) 622 exc = fut.exception() 623 self.assertIs(exc, err) 624 self.assertEqual(1000, self.file.tell()) 625 626 def test_sock_sendfile_exception(self): 627 sock, proto = self.prepare() 628 629 fileno = self.file.fileno() 630 fut = self.loop.create_future() 631 err = asyncio.SendfileNotAvailableError() 632 with mock.patch('os.sendfile', side_effect=err): 633 self.loop._sock_sendfile_native_impl(fut, sock.fileno(), 634 sock, fileno, 635 1000, None, len(self.DATA), 636 1000) 637 with self.assertRaises(KeyError): 638 self.loop._selector.get_key(sock) 639 exc = fut.exception() 640 self.assertIs(exc, err) 641 self.assertEqual(1000, self.file.tell()) 642 643 644class UnixReadPipeTransportTests(test_utils.TestCase): 645 646 def setUp(self): 647 super().setUp() 648 self.loop = self.new_test_loop() 649 self.protocol = test_utils.make_test_protocol(asyncio.Protocol) 650 self.pipe = mock.Mock(spec_set=io.RawIOBase) 651 self.pipe.fileno.return_value = 5 652 653 blocking_patcher = mock.patch('os.set_blocking') 654 blocking_patcher.start() 655 self.addCleanup(blocking_patcher.stop) 656 657 fstat_patcher = mock.patch('os.fstat') 658 m_fstat = fstat_patcher.start() 659 st = mock.Mock() 660 st.st_mode = stat.S_IFIFO 661 m_fstat.return_value = st 662 self.addCleanup(fstat_patcher.stop) 663 664 def read_pipe_transport(self, waiter=None): 665 transport = unix_events._UnixReadPipeTransport(self.loop, self.pipe, 666 self.protocol, 667 waiter=waiter) 668 self.addCleanup(close_pipe_transport, transport) 669 return transport 670 671 def test_ctor(self): 672 waiter = self.loop.create_future() 673 tr = self.read_pipe_transport(waiter=waiter) 674 self.loop.run_until_complete(waiter) 675 676 self.protocol.connection_made.assert_called_with(tr) 677 self.loop.assert_reader(5, tr._read_ready) 678 self.assertIsNone(waiter.result()) 679 680 @mock.patch('os.read') 681 def test__read_ready(self, m_read): 682 tr = self.read_pipe_transport() 683 m_read.return_value = b'data' 684 tr._read_ready() 685 686 m_read.assert_called_with(5, tr.max_size) 687 self.protocol.data_received.assert_called_with(b'data') 688 689 @mock.patch('os.read') 690 def test__read_ready_eof(self, m_read): 691 tr = self.read_pipe_transport() 692 m_read.return_value = b'' 693 tr._read_ready() 694 695 m_read.assert_called_with(5, tr.max_size) 696 self.assertFalse(self.loop.readers) 697 test_utils.run_briefly(self.loop) 698 self.protocol.eof_received.assert_called_with() 699 self.protocol.connection_lost.assert_called_with(None) 700 701 @mock.patch('os.read') 702 def test__read_ready_blocked(self, m_read): 703 tr = self.read_pipe_transport() 704 m_read.side_effect = BlockingIOError 705 tr._read_ready() 706 707 m_read.assert_called_with(5, tr.max_size) 708 test_utils.run_briefly(self.loop) 709 self.assertFalse(self.protocol.data_received.called) 710 711 @mock.patch('asyncio.log.logger.error') 712 @mock.patch('os.read') 713 def test__read_ready_error(self, m_read, m_logexc): 714 tr = self.read_pipe_transport() 715 err = OSError() 716 m_read.side_effect = err 717 tr._close = mock.Mock() 718 tr._read_ready() 719 720 m_read.assert_called_with(5, tr.max_size) 721 tr._close.assert_called_with(err) 722 m_logexc.assert_called_with( 723 test_utils.MockPattern( 724 'Fatal read error on pipe transport' 725 '\nprotocol:.*\ntransport:.*'), 726 exc_info=(OSError, MOCK_ANY, MOCK_ANY)) 727 728 @mock.patch('os.read') 729 def test_pause_reading(self, m_read): 730 tr = self.read_pipe_transport() 731 m = mock.Mock() 732 self.loop.add_reader(5, m) 733 tr.pause_reading() 734 self.assertFalse(self.loop.readers) 735 736 @mock.patch('os.read') 737 def test_resume_reading(self, m_read): 738 tr = self.read_pipe_transport() 739 tr.pause_reading() 740 tr.resume_reading() 741 self.loop.assert_reader(5, tr._read_ready) 742 743 @mock.patch('os.read') 744 def test_close(self, m_read): 745 tr = self.read_pipe_transport() 746 tr._close = mock.Mock() 747 tr.close() 748 tr._close.assert_called_with(None) 749 750 @mock.patch('os.read') 751 def test_close_already_closing(self, m_read): 752 tr = self.read_pipe_transport() 753 tr._closing = True 754 tr._close = mock.Mock() 755 tr.close() 756 self.assertFalse(tr._close.called) 757 758 @mock.patch('os.read') 759 def test__close(self, m_read): 760 tr = self.read_pipe_transport() 761 err = object() 762 tr._close(err) 763 self.assertTrue(tr.is_closing()) 764 self.assertFalse(self.loop.readers) 765 test_utils.run_briefly(self.loop) 766 self.protocol.connection_lost.assert_called_with(err) 767 768 def test__call_connection_lost(self): 769 tr = self.read_pipe_transport() 770 self.assertIsNotNone(tr._protocol) 771 self.assertIsNotNone(tr._loop) 772 773 err = None 774 tr._call_connection_lost(err) 775 self.protocol.connection_lost.assert_called_with(err) 776 self.pipe.close.assert_called_with() 777 778 self.assertIsNone(tr._protocol) 779 self.assertIsNone(tr._loop) 780 781 def test__call_connection_lost_with_err(self): 782 tr = self.read_pipe_transport() 783 self.assertIsNotNone(tr._protocol) 784 self.assertIsNotNone(tr._loop) 785 786 err = OSError() 787 tr._call_connection_lost(err) 788 self.protocol.connection_lost.assert_called_with(err) 789 self.pipe.close.assert_called_with() 790 791 self.assertIsNone(tr._protocol) 792 self.assertIsNone(tr._loop) 793 794 def test_pause_reading_on_closed_pipe(self): 795 tr = self.read_pipe_transport() 796 tr.close() 797 test_utils.run_briefly(self.loop) 798 self.assertIsNone(tr._loop) 799 tr.pause_reading() 800 801 def test_pause_reading_on_paused_pipe(self): 802 tr = self.read_pipe_transport() 803 tr.pause_reading() 804 # the second call should do nothing 805 tr.pause_reading() 806 807 def test_resume_reading_on_closed_pipe(self): 808 tr = self.read_pipe_transport() 809 tr.close() 810 test_utils.run_briefly(self.loop) 811 self.assertIsNone(tr._loop) 812 tr.resume_reading() 813 814 def test_resume_reading_on_paused_pipe(self): 815 tr = self.read_pipe_transport() 816 # the pipe is not paused 817 # resuming should do nothing 818 tr.resume_reading() 819 820 821class UnixWritePipeTransportTests(test_utils.TestCase): 822 823 def setUp(self): 824 super().setUp() 825 self.loop = self.new_test_loop() 826 self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol) 827 self.pipe = mock.Mock(spec_set=io.RawIOBase) 828 self.pipe.fileno.return_value = 5 829 830 blocking_patcher = mock.patch('os.set_blocking') 831 blocking_patcher.start() 832 self.addCleanup(blocking_patcher.stop) 833 834 fstat_patcher = mock.patch('os.fstat') 835 m_fstat = fstat_patcher.start() 836 st = mock.Mock() 837 st.st_mode = stat.S_IFSOCK 838 m_fstat.return_value = st 839 self.addCleanup(fstat_patcher.stop) 840 841 def write_pipe_transport(self, waiter=None): 842 transport = unix_events._UnixWritePipeTransport(self.loop, self.pipe, 843 self.protocol, 844 waiter=waiter) 845 self.addCleanup(close_pipe_transport, transport) 846 return transport 847 848 def test_ctor(self): 849 waiter = self.loop.create_future() 850 tr = self.write_pipe_transport(waiter=waiter) 851 self.loop.run_until_complete(waiter) 852 853 self.protocol.connection_made.assert_called_with(tr) 854 self.loop.assert_reader(5, tr._read_ready) 855 self.assertEqual(None, waiter.result()) 856 857 def test_can_write_eof(self): 858 tr = self.write_pipe_transport() 859 self.assertTrue(tr.can_write_eof()) 860 861 @mock.patch('os.write') 862 def test_write(self, m_write): 863 tr = self.write_pipe_transport() 864 m_write.return_value = 4 865 tr.write(b'data') 866 m_write.assert_called_with(5, b'data') 867 self.assertFalse(self.loop.writers) 868 self.assertEqual(bytearray(), tr._buffer) 869 870 @mock.patch('os.write') 871 def test_write_no_data(self, m_write): 872 tr = self.write_pipe_transport() 873 tr.write(b'') 874 self.assertFalse(m_write.called) 875 self.assertFalse(self.loop.writers) 876 self.assertEqual(bytearray(b''), tr._buffer) 877 878 @mock.patch('os.write') 879 def test_write_partial(self, m_write): 880 tr = self.write_pipe_transport() 881 m_write.return_value = 2 882 tr.write(b'data') 883 self.loop.assert_writer(5, tr._write_ready) 884 self.assertEqual(bytearray(b'ta'), tr._buffer) 885 886 @mock.patch('os.write') 887 def test_write_buffer(self, m_write): 888 tr = self.write_pipe_transport() 889 self.loop.add_writer(5, tr._write_ready) 890 tr._buffer = bytearray(b'previous') 891 tr.write(b'data') 892 self.assertFalse(m_write.called) 893 self.loop.assert_writer(5, tr._write_ready) 894 self.assertEqual(bytearray(b'previousdata'), tr._buffer) 895 896 @mock.patch('os.write') 897 def test_write_again(self, m_write): 898 tr = self.write_pipe_transport() 899 m_write.side_effect = BlockingIOError() 900 tr.write(b'data') 901 m_write.assert_called_with(5, bytearray(b'data')) 902 self.loop.assert_writer(5, tr._write_ready) 903 self.assertEqual(bytearray(b'data'), tr._buffer) 904 905 @mock.patch('asyncio.unix_events.logger') 906 @mock.patch('os.write') 907 def test_write_err(self, m_write, m_log): 908 tr = self.write_pipe_transport() 909 err = OSError() 910 m_write.side_effect = err 911 tr._fatal_error = mock.Mock() 912 tr.write(b'data') 913 m_write.assert_called_with(5, b'data') 914 self.assertFalse(self.loop.writers) 915 self.assertEqual(bytearray(), tr._buffer) 916 tr._fatal_error.assert_called_with( 917 err, 918 'Fatal write error on pipe transport') 919 self.assertEqual(1, tr._conn_lost) 920 921 tr.write(b'data') 922 self.assertEqual(2, tr._conn_lost) 923 tr.write(b'data') 924 tr.write(b'data') 925 tr.write(b'data') 926 tr.write(b'data') 927 # This is a bit overspecified. :-( 928 m_log.warning.assert_called_with( 929 'pipe closed by peer or os.write(pipe, data) raised exception.') 930 tr.close() 931 932 @mock.patch('os.write') 933 def test_write_close(self, m_write): 934 tr = self.write_pipe_transport() 935 tr._read_ready() # pipe was closed by peer 936 937 tr.write(b'data') 938 self.assertEqual(tr._conn_lost, 1) 939 tr.write(b'data') 940 self.assertEqual(tr._conn_lost, 2) 941 942 def test__read_ready(self): 943 tr = self.write_pipe_transport() 944 tr._read_ready() 945 self.assertFalse(self.loop.readers) 946 self.assertFalse(self.loop.writers) 947 self.assertTrue(tr.is_closing()) 948 test_utils.run_briefly(self.loop) 949 self.protocol.connection_lost.assert_called_with(None) 950 951 @mock.patch('os.write') 952 def test__write_ready(self, m_write): 953 tr = self.write_pipe_transport() 954 self.loop.add_writer(5, tr._write_ready) 955 tr._buffer = bytearray(b'data') 956 m_write.return_value = 4 957 tr._write_ready() 958 self.assertFalse(self.loop.writers) 959 self.assertEqual(bytearray(), tr._buffer) 960 961 @mock.patch('os.write') 962 def test__write_ready_partial(self, m_write): 963 tr = self.write_pipe_transport() 964 self.loop.add_writer(5, tr._write_ready) 965 tr._buffer = bytearray(b'data') 966 m_write.return_value = 3 967 tr._write_ready() 968 self.loop.assert_writer(5, tr._write_ready) 969 self.assertEqual(bytearray(b'a'), tr._buffer) 970 971 @mock.patch('os.write') 972 def test__write_ready_again(self, m_write): 973 tr = self.write_pipe_transport() 974 self.loop.add_writer(5, tr._write_ready) 975 tr._buffer = bytearray(b'data') 976 m_write.side_effect = BlockingIOError() 977 tr._write_ready() 978 m_write.assert_called_with(5, bytearray(b'data')) 979 self.loop.assert_writer(5, tr._write_ready) 980 self.assertEqual(bytearray(b'data'), tr._buffer) 981 982 @mock.patch('os.write') 983 def test__write_ready_empty(self, m_write): 984 tr = self.write_pipe_transport() 985 self.loop.add_writer(5, tr._write_ready) 986 tr._buffer = bytearray(b'data') 987 m_write.return_value = 0 988 tr._write_ready() 989 m_write.assert_called_with(5, bytearray(b'data')) 990 self.loop.assert_writer(5, tr._write_ready) 991 self.assertEqual(bytearray(b'data'), tr._buffer) 992 993 @mock.patch('asyncio.log.logger.error') 994 @mock.patch('os.write') 995 def test__write_ready_err(self, m_write, m_logexc): 996 tr = self.write_pipe_transport() 997 self.loop.add_writer(5, tr._write_ready) 998 tr._buffer = bytearray(b'data') 999 m_write.side_effect = err = OSError() 1000 tr._write_ready() 1001 self.assertFalse(self.loop.writers) 1002 self.assertFalse(self.loop.readers) 1003 self.assertEqual(bytearray(), tr._buffer) 1004 self.assertTrue(tr.is_closing()) 1005 m_logexc.assert_not_called() 1006 self.assertEqual(1, tr._conn_lost) 1007 test_utils.run_briefly(self.loop) 1008 self.protocol.connection_lost.assert_called_with(err) 1009 1010 @mock.patch('os.write') 1011 def test__write_ready_closing(self, m_write): 1012 tr = self.write_pipe_transport() 1013 self.loop.add_writer(5, tr._write_ready) 1014 tr._closing = True 1015 tr._buffer = bytearray(b'data') 1016 m_write.return_value = 4 1017 tr._write_ready() 1018 self.assertFalse(self.loop.writers) 1019 self.assertFalse(self.loop.readers) 1020 self.assertEqual(bytearray(), tr._buffer) 1021 self.protocol.connection_lost.assert_called_with(None) 1022 self.pipe.close.assert_called_with() 1023 1024 @mock.patch('os.write') 1025 def test_abort(self, m_write): 1026 tr = self.write_pipe_transport() 1027 self.loop.add_writer(5, tr._write_ready) 1028 self.loop.add_reader(5, tr._read_ready) 1029 tr._buffer = [b'da', b'ta'] 1030 tr.abort() 1031 self.assertFalse(m_write.called) 1032 self.assertFalse(self.loop.readers) 1033 self.assertFalse(self.loop.writers) 1034 self.assertEqual([], tr._buffer) 1035 self.assertTrue(tr.is_closing()) 1036 test_utils.run_briefly(self.loop) 1037 self.protocol.connection_lost.assert_called_with(None) 1038 1039 def test__call_connection_lost(self): 1040 tr = self.write_pipe_transport() 1041 self.assertIsNotNone(tr._protocol) 1042 self.assertIsNotNone(tr._loop) 1043 1044 err = None 1045 tr._call_connection_lost(err) 1046 self.protocol.connection_lost.assert_called_with(err) 1047 self.pipe.close.assert_called_with() 1048 1049 self.assertIsNone(tr._protocol) 1050 self.assertIsNone(tr._loop) 1051 1052 def test__call_connection_lost_with_err(self): 1053 tr = self.write_pipe_transport() 1054 self.assertIsNotNone(tr._protocol) 1055 self.assertIsNotNone(tr._loop) 1056 1057 err = OSError() 1058 tr._call_connection_lost(err) 1059 self.protocol.connection_lost.assert_called_with(err) 1060 self.pipe.close.assert_called_with() 1061 1062 self.assertIsNone(tr._protocol) 1063 self.assertIsNone(tr._loop) 1064 1065 def test_close(self): 1066 tr = self.write_pipe_transport() 1067 tr.write_eof = mock.Mock() 1068 tr.close() 1069 tr.write_eof.assert_called_with() 1070 1071 # closing the transport twice must not fail 1072 tr.close() 1073 1074 def test_close_closing(self): 1075 tr = self.write_pipe_transport() 1076 tr.write_eof = mock.Mock() 1077 tr._closing = True 1078 tr.close() 1079 self.assertFalse(tr.write_eof.called) 1080 1081 def test_write_eof(self): 1082 tr = self.write_pipe_transport() 1083 tr.write_eof() 1084 self.assertTrue(tr.is_closing()) 1085 self.assertFalse(self.loop.readers) 1086 test_utils.run_briefly(self.loop) 1087 self.protocol.connection_lost.assert_called_with(None) 1088 1089 def test_write_eof_pending(self): 1090 tr = self.write_pipe_transport() 1091 tr._buffer = [b'data'] 1092 tr.write_eof() 1093 self.assertTrue(tr.is_closing()) 1094 self.assertFalse(self.protocol.connection_lost.called) 1095 1096 1097class AbstractChildWatcherTests(unittest.TestCase): 1098 1099 def test_not_implemented(self): 1100 f = mock.Mock() 1101 watcher = asyncio.AbstractChildWatcher() 1102 self.assertRaises( 1103 NotImplementedError, watcher.add_child_handler, f, f) 1104 self.assertRaises( 1105 NotImplementedError, watcher.remove_child_handler, f) 1106 self.assertRaises( 1107 NotImplementedError, watcher.attach_loop, f) 1108 self.assertRaises( 1109 NotImplementedError, watcher.close) 1110 self.assertRaises( 1111 NotImplementedError, watcher.is_active) 1112 self.assertRaises( 1113 NotImplementedError, watcher.__enter__) 1114 self.assertRaises( 1115 NotImplementedError, watcher.__exit__, f, f, f) 1116 1117 1118class BaseChildWatcherTests(unittest.TestCase): 1119 1120 def test_not_implemented(self): 1121 f = mock.Mock() 1122 watcher = unix_events.BaseChildWatcher() 1123 self.assertRaises( 1124 NotImplementedError, watcher._do_waitpid, f) 1125 1126 1127WaitPidMocks = collections.namedtuple("WaitPidMocks", 1128 ("waitpid", 1129 "WIFEXITED", 1130 "WIFSIGNALED", 1131 "WEXITSTATUS", 1132 "WTERMSIG", 1133 )) 1134 1135 1136class ChildWatcherTestsMixin: 1137 1138 ignore_warnings = mock.patch.object(log.logger, "warning") 1139 1140 def setUp(self): 1141 super().setUp() 1142 self.loop = self.new_test_loop() 1143 self.running = False 1144 self.zombies = {} 1145 1146 with mock.patch.object( 1147 self.loop, "add_signal_handler") as self.m_add_signal_handler: 1148 self.watcher = self.create_watcher() 1149 self.watcher.attach_loop(self.loop) 1150 1151 def waitpid(self, pid, flags): 1152 if isinstance(self.watcher, asyncio.SafeChildWatcher) or pid != -1: 1153 self.assertGreater(pid, 0) 1154 try: 1155 if pid < 0: 1156 return self.zombies.popitem() 1157 else: 1158 return pid, self.zombies.pop(pid) 1159 except KeyError: 1160 pass 1161 if self.running: 1162 return 0, 0 1163 else: 1164 raise ChildProcessError() 1165 1166 def add_zombie(self, pid, returncode): 1167 self.zombies[pid] = returncode + 32768 1168 1169 def WIFEXITED(self, status): 1170 return status >= 32768 1171 1172 def WIFSIGNALED(self, status): 1173 return 32700 < status < 32768 1174 1175 def WEXITSTATUS(self, status): 1176 self.assertTrue(self.WIFEXITED(status)) 1177 return status - 32768 1178 1179 def WTERMSIG(self, status): 1180 self.assertTrue(self.WIFSIGNALED(status)) 1181 return 32768 - status 1182 1183 def test_create_watcher(self): 1184 self.m_add_signal_handler.assert_called_once_with( 1185 signal.SIGCHLD, self.watcher._sig_chld) 1186 1187 def waitpid_mocks(func): 1188 def wrapped_func(self): 1189 def patch(target, wrapper): 1190 return mock.patch(target, wraps=wrapper, 1191 new_callable=mock.Mock) 1192 1193 with patch('os.WTERMSIG', self.WTERMSIG) as m_WTERMSIG, \ 1194 patch('os.WEXITSTATUS', self.WEXITSTATUS) as m_WEXITSTATUS, \ 1195 patch('os.WIFSIGNALED', self.WIFSIGNALED) as m_WIFSIGNALED, \ 1196 patch('os.WIFEXITED', self.WIFEXITED) as m_WIFEXITED, \ 1197 patch('os.waitpid', self.waitpid) as m_waitpid: 1198 func(self, WaitPidMocks(m_waitpid, 1199 m_WIFEXITED, m_WIFSIGNALED, 1200 m_WEXITSTATUS, m_WTERMSIG, 1201 )) 1202 return wrapped_func 1203 1204 @waitpid_mocks 1205 def test_sigchld(self, m): 1206 # register a child 1207 callback = mock.Mock() 1208 1209 with self.watcher: 1210 self.running = True 1211 self.watcher.add_child_handler(42, callback, 9, 10, 14) 1212 1213 self.assertFalse(callback.called) 1214 self.assertFalse(m.WIFEXITED.called) 1215 self.assertFalse(m.WIFSIGNALED.called) 1216 self.assertFalse(m.WEXITSTATUS.called) 1217 self.assertFalse(m.WTERMSIG.called) 1218 1219 # child is running 1220 self.watcher._sig_chld() 1221 1222 self.assertFalse(callback.called) 1223 self.assertFalse(m.WIFEXITED.called) 1224 self.assertFalse(m.WIFSIGNALED.called) 1225 self.assertFalse(m.WEXITSTATUS.called) 1226 self.assertFalse(m.WTERMSIG.called) 1227 1228 # child terminates (returncode 12) 1229 self.running = False 1230 self.add_zombie(42, 12) 1231 self.watcher._sig_chld() 1232 1233 self.assertTrue(m.WIFEXITED.called) 1234 self.assertTrue(m.WEXITSTATUS.called) 1235 self.assertFalse(m.WTERMSIG.called) 1236 callback.assert_called_once_with(42, 12, 9, 10, 14) 1237 1238 m.WIFSIGNALED.reset_mock() 1239 m.WIFEXITED.reset_mock() 1240 m.WEXITSTATUS.reset_mock() 1241 callback.reset_mock() 1242 1243 # ensure that the child is effectively reaped 1244 self.add_zombie(42, 13) 1245 with self.ignore_warnings: 1246 self.watcher._sig_chld() 1247 1248 self.assertFalse(callback.called) 1249 self.assertFalse(m.WTERMSIG.called) 1250 1251 m.WIFSIGNALED.reset_mock() 1252 m.WIFEXITED.reset_mock() 1253 m.WEXITSTATUS.reset_mock() 1254 1255 # sigchld called again 1256 self.zombies.clear() 1257 self.watcher._sig_chld() 1258 1259 self.assertFalse(callback.called) 1260 self.assertFalse(m.WIFEXITED.called) 1261 self.assertFalse(m.WIFSIGNALED.called) 1262 self.assertFalse(m.WEXITSTATUS.called) 1263 self.assertFalse(m.WTERMSIG.called) 1264 1265 @waitpid_mocks 1266 def test_sigchld_two_children(self, m): 1267 callback1 = mock.Mock() 1268 callback2 = mock.Mock() 1269 1270 # register child 1 1271 with self.watcher: 1272 self.running = True 1273 self.watcher.add_child_handler(43, callback1, 7, 8) 1274 1275 self.assertFalse(callback1.called) 1276 self.assertFalse(callback2.called) 1277 self.assertFalse(m.WIFEXITED.called) 1278 self.assertFalse(m.WIFSIGNALED.called) 1279 self.assertFalse(m.WEXITSTATUS.called) 1280 self.assertFalse(m.WTERMSIG.called) 1281 1282 # register child 2 1283 with self.watcher: 1284 self.watcher.add_child_handler(44, callback2, 147, 18) 1285 1286 self.assertFalse(callback1.called) 1287 self.assertFalse(callback2.called) 1288 self.assertFalse(m.WIFEXITED.called) 1289 self.assertFalse(m.WIFSIGNALED.called) 1290 self.assertFalse(m.WEXITSTATUS.called) 1291 self.assertFalse(m.WTERMSIG.called) 1292 1293 # children are running 1294 self.watcher._sig_chld() 1295 1296 self.assertFalse(callback1.called) 1297 self.assertFalse(callback2.called) 1298 self.assertFalse(m.WIFEXITED.called) 1299 self.assertFalse(m.WIFSIGNALED.called) 1300 self.assertFalse(m.WEXITSTATUS.called) 1301 self.assertFalse(m.WTERMSIG.called) 1302 1303 # child 1 terminates (signal 3) 1304 self.add_zombie(43, -3) 1305 self.watcher._sig_chld() 1306 1307 callback1.assert_called_once_with(43, -3, 7, 8) 1308 self.assertFalse(callback2.called) 1309 self.assertTrue(m.WIFSIGNALED.called) 1310 self.assertFalse(m.WEXITSTATUS.called) 1311 self.assertTrue(m.WTERMSIG.called) 1312 1313 m.WIFSIGNALED.reset_mock() 1314 m.WIFEXITED.reset_mock() 1315 m.WTERMSIG.reset_mock() 1316 callback1.reset_mock() 1317 1318 # child 2 still running 1319 self.watcher._sig_chld() 1320 1321 self.assertFalse(callback1.called) 1322 self.assertFalse(callback2.called) 1323 self.assertFalse(m.WIFEXITED.called) 1324 self.assertFalse(m.WIFSIGNALED.called) 1325 self.assertFalse(m.WEXITSTATUS.called) 1326 self.assertFalse(m.WTERMSIG.called) 1327 1328 # child 2 terminates (code 108) 1329 self.add_zombie(44, 108) 1330 self.running = False 1331 self.watcher._sig_chld() 1332 1333 callback2.assert_called_once_with(44, 108, 147, 18) 1334 self.assertFalse(callback1.called) 1335 self.assertTrue(m.WIFEXITED.called) 1336 self.assertTrue(m.WEXITSTATUS.called) 1337 self.assertFalse(m.WTERMSIG.called) 1338 1339 m.WIFSIGNALED.reset_mock() 1340 m.WIFEXITED.reset_mock() 1341 m.WEXITSTATUS.reset_mock() 1342 callback2.reset_mock() 1343 1344 # ensure that the children are effectively reaped 1345 self.add_zombie(43, 14) 1346 self.add_zombie(44, 15) 1347 with self.ignore_warnings: 1348 self.watcher._sig_chld() 1349 1350 self.assertFalse(callback1.called) 1351 self.assertFalse(callback2.called) 1352 self.assertFalse(m.WTERMSIG.called) 1353 1354 m.WIFSIGNALED.reset_mock() 1355 m.WIFEXITED.reset_mock() 1356 m.WEXITSTATUS.reset_mock() 1357 1358 # sigchld called again 1359 self.zombies.clear() 1360 self.watcher._sig_chld() 1361 1362 self.assertFalse(callback1.called) 1363 self.assertFalse(callback2.called) 1364 self.assertFalse(m.WIFEXITED.called) 1365 self.assertFalse(m.WIFSIGNALED.called) 1366 self.assertFalse(m.WEXITSTATUS.called) 1367 self.assertFalse(m.WTERMSIG.called) 1368 1369 @waitpid_mocks 1370 def test_sigchld_two_children_terminating_together(self, m): 1371 callback1 = mock.Mock() 1372 callback2 = mock.Mock() 1373 1374 # register child 1 1375 with self.watcher: 1376 self.running = True 1377 self.watcher.add_child_handler(45, callback1, 17, 8) 1378 1379 self.assertFalse(callback1.called) 1380 self.assertFalse(callback2.called) 1381 self.assertFalse(m.WIFEXITED.called) 1382 self.assertFalse(m.WIFSIGNALED.called) 1383 self.assertFalse(m.WEXITSTATUS.called) 1384 self.assertFalse(m.WTERMSIG.called) 1385 1386 # register child 2 1387 with self.watcher: 1388 self.watcher.add_child_handler(46, callback2, 1147, 18) 1389 1390 self.assertFalse(callback1.called) 1391 self.assertFalse(callback2.called) 1392 self.assertFalse(m.WIFEXITED.called) 1393 self.assertFalse(m.WIFSIGNALED.called) 1394 self.assertFalse(m.WEXITSTATUS.called) 1395 self.assertFalse(m.WTERMSIG.called) 1396 1397 # children are running 1398 self.watcher._sig_chld() 1399 1400 self.assertFalse(callback1.called) 1401 self.assertFalse(callback2.called) 1402 self.assertFalse(m.WIFEXITED.called) 1403 self.assertFalse(m.WIFSIGNALED.called) 1404 self.assertFalse(m.WEXITSTATUS.called) 1405 self.assertFalse(m.WTERMSIG.called) 1406 1407 # child 1 terminates (code 78) 1408 # child 2 terminates (signal 5) 1409 self.add_zombie(45, 78) 1410 self.add_zombie(46, -5) 1411 self.running = False 1412 self.watcher._sig_chld() 1413 1414 callback1.assert_called_once_with(45, 78, 17, 8) 1415 callback2.assert_called_once_with(46, -5, 1147, 18) 1416 self.assertTrue(m.WIFSIGNALED.called) 1417 self.assertTrue(m.WIFEXITED.called) 1418 self.assertTrue(m.WEXITSTATUS.called) 1419 self.assertTrue(m.WTERMSIG.called) 1420 1421 m.WIFSIGNALED.reset_mock() 1422 m.WIFEXITED.reset_mock() 1423 m.WTERMSIG.reset_mock() 1424 m.WEXITSTATUS.reset_mock() 1425 callback1.reset_mock() 1426 callback2.reset_mock() 1427 1428 # ensure that the children are effectively reaped 1429 self.add_zombie(45, 14) 1430 self.add_zombie(46, 15) 1431 with self.ignore_warnings: 1432 self.watcher._sig_chld() 1433 1434 self.assertFalse(callback1.called) 1435 self.assertFalse(callback2.called) 1436 self.assertFalse(m.WTERMSIG.called) 1437 1438 @waitpid_mocks 1439 def test_sigchld_race_condition(self, m): 1440 # register a child 1441 callback = mock.Mock() 1442 1443 with self.watcher: 1444 # child terminates before being registered 1445 self.add_zombie(50, 4) 1446 self.watcher._sig_chld() 1447 1448 self.watcher.add_child_handler(50, callback, 1, 12) 1449 1450 callback.assert_called_once_with(50, 4, 1, 12) 1451 callback.reset_mock() 1452 1453 # ensure that the child is effectively reaped 1454 self.add_zombie(50, -1) 1455 with self.ignore_warnings: 1456 self.watcher._sig_chld() 1457 1458 self.assertFalse(callback.called) 1459 1460 @waitpid_mocks 1461 def test_sigchld_replace_handler(self, m): 1462 callback1 = mock.Mock() 1463 callback2 = mock.Mock() 1464 1465 # register a child 1466 with self.watcher: 1467 self.running = True 1468 self.watcher.add_child_handler(51, callback1, 19) 1469 1470 self.assertFalse(callback1.called) 1471 self.assertFalse(callback2.called) 1472 self.assertFalse(m.WIFEXITED.called) 1473 self.assertFalse(m.WIFSIGNALED.called) 1474 self.assertFalse(m.WEXITSTATUS.called) 1475 self.assertFalse(m.WTERMSIG.called) 1476 1477 # register the same child again 1478 with self.watcher: 1479 self.watcher.add_child_handler(51, callback2, 21) 1480 1481 self.assertFalse(callback1.called) 1482 self.assertFalse(callback2.called) 1483 self.assertFalse(m.WIFEXITED.called) 1484 self.assertFalse(m.WIFSIGNALED.called) 1485 self.assertFalse(m.WEXITSTATUS.called) 1486 self.assertFalse(m.WTERMSIG.called) 1487 1488 # child terminates (signal 8) 1489 self.running = False 1490 self.add_zombie(51, -8) 1491 self.watcher._sig_chld() 1492 1493 callback2.assert_called_once_with(51, -8, 21) 1494 self.assertFalse(callback1.called) 1495 self.assertTrue(m.WIFSIGNALED.called) 1496 self.assertFalse(m.WEXITSTATUS.called) 1497 self.assertTrue(m.WTERMSIG.called) 1498 1499 m.WIFSIGNALED.reset_mock() 1500 m.WIFEXITED.reset_mock() 1501 m.WTERMSIG.reset_mock() 1502 callback2.reset_mock() 1503 1504 # ensure that the child is effectively reaped 1505 self.add_zombie(51, 13) 1506 with self.ignore_warnings: 1507 self.watcher._sig_chld() 1508 1509 self.assertFalse(callback1.called) 1510 self.assertFalse(callback2.called) 1511 self.assertFalse(m.WTERMSIG.called) 1512 1513 @waitpid_mocks 1514 def test_sigchld_remove_handler(self, m): 1515 callback = mock.Mock() 1516 1517 # register a child 1518 with self.watcher: 1519 self.running = True 1520 self.watcher.add_child_handler(52, callback, 1984) 1521 1522 self.assertFalse(callback.called) 1523 self.assertFalse(m.WIFEXITED.called) 1524 self.assertFalse(m.WIFSIGNALED.called) 1525 self.assertFalse(m.WEXITSTATUS.called) 1526 self.assertFalse(m.WTERMSIG.called) 1527 1528 # unregister the child 1529 self.watcher.remove_child_handler(52) 1530 1531 self.assertFalse(callback.called) 1532 self.assertFalse(m.WIFEXITED.called) 1533 self.assertFalse(m.WIFSIGNALED.called) 1534 self.assertFalse(m.WEXITSTATUS.called) 1535 self.assertFalse(m.WTERMSIG.called) 1536 1537 # child terminates (code 99) 1538 self.running = False 1539 self.add_zombie(52, 99) 1540 with self.ignore_warnings: 1541 self.watcher._sig_chld() 1542 1543 self.assertFalse(callback.called) 1544 1545 @waitpid_mocks 1546 def test_sigchld_unknown_status(self, m): 1547 callback = mock.Mock() 1548 1549 # register a child 1550 with self.watcher: 1551 self.running = True 1552 self.watcher.add_child_handler(53, callback, -19) 1553 1554 self.assertFalse(callback.called) 1555 self.assertFalse(m.WIFEXITED.called) 1556 self.assertFalse(m.WIFSIGNALED.called) 1557 self.assertFalse(m.WEXITSTATUS.called) 1558 self.assertFalse(m.WTERMSIG.called) 1559 1560 # terminate with unknown status 1561 self.zombies[53] = 1178 1562 self.running = False 1563 self.watcher._sig_chld() 1564 1565 callback.assert_called_once_with(53, 1178, -19) 1566 self.assertTrue(m.WIFEXITED.called) 1567 self.assertTrue(m.WIFSIGNALED.called) 1568 self.assertFalse(m.WEXITSTATUS.called) 1569 self.assertFalse(m.WTERMSIG.called) 1570 1571 callback.reset_mock() 1572 m.WIFEXITED.reset_mock() 1573 m.WIFSIGNALED.reset_mock() 1574 1575 # ensure that the child is effectively reaped 1576 self.add_zombie(53, 101) 1577 with self.ignore_warnings: 1578 self.watcher._sig_chld() 1579 1580 self.assertFalse(callback.called) 1581 1582 @waitpid_mocks 1583 def test_remove_child_handler(self, m): 1584 callback1 = mock.Mock() 1585 callback2 = mock.Mock() 1586 callback3 = mock.Mock() 1587 1588 # register children 1589 with self.watcher: 1590 self.running = True 1591 self.watcher.add_child_handler(54, callback1, 1) 1592 self.watcher.add_child_handler(55, callback2, 2) 1593 self.watcher.add_child_handler(56, callback3, 3) 1594 1595 # remove child handler 1 1596 self.assertTrue(self.watcher.remove_child_handler(54)) 1597 1598 # remove child handler 2 multiple times 1599 self.assertTrue(self.watcher.remove_child_handler(55)) 1600 self.assertFalse(self.watcher.remove_child_handler(55)) 1601 self.assertFalse(self.watcher.remove_child_handler(55)) 1602 1603 # all children terminate 1604 self.add_zombie(54, 0) 1605 self.add_zombie(55, 1) 1606 self.add_zombie(56, 2) 1607 self.running = False 1608 with self.ignore_warnings: 1609 self.watcher._sig_chld() 1610 1611 self.assertFalse(callback1.called) 1612 self.assertFalse(callback2.called) 1613 callback3.assert_called_once_with(56, 2, 3) 1614 1615 @waitpid_mocks 1616 def test_sigchld_unhandled_exception(self, m): 1617 callback = mock.Mock() 1618 1619 # register a child 1620 with self.watcher: 1621 self.running = True 1622 self.watcher.add_child_handler(57, callback) 1623 1624 # raise an exception 1625 m.waitpid.side_effect = ValueError 1626 1627 with mock.patch.object(log.logger, 1628 'error') as m_error: 1629 1630 self.assertEqual(self.watcher._sig_chld(), None) 1631 self.assertTrue(m_error.called) 1632 1633 @waitpid_mocks 1634 def test_sigchld_child_reaped_elsewhere(self, m): 1635 # register a child 1636 callback = mock.Mock() 1637 1638 with self.watcher: 1639 self.running = True 1640 self.watcher.add_child_handler(58, callback) 1641 1642 self.assertFalse(callback.called) 1643 self.assertFalse(m.WIFEXITED.called) 1644 self.assertFalse(m.WIFSIGNALED.called) 1645 self.assertFalse(m.WEXITSTATUS.called) 1646 self.assertFalse(m.WTERMSIG.called) 1647 1648 # child terminates 1649 self.running = False 1650 self.add_zombie(58, 4) 1651 1652 # waitpid is called elsewhere 1653 os.waitpid(58, os.WNOHANG) 1654 1655 m.waitpid.reset_mock() 1656 1657 # sigchld 1658 with self.ignore_warnings: 1659 self.watcher._sig_chld() 1660 1661 if isinstance(self.watcher, asyncio.FastChildWatcher): 1662 # here the FastChildWatche enters a deadlock 1663 # (there is no way to prevent it) 1664 self.assertFalse(callback.called) 1665 else: 1666 callback.assert_called_once_with(58, 255) 1667 1668 @waitpid_mocks 1669 def test_sigchld_unknown_pid_during_registration(self, m): 1670 # register two children 1671 callback1 = mock.Mock() 1672 callback2 = mock.Mock() 1673 1674 with self.ignore_warnings, self.watcher: 1675 self.running = True 1676 # child 1 terminates 1677 self.add_zombie(591, 7) 1678 # an unknown child terminates 1679 self.add_zombie(593, 17) 1680 1681 self.watcher._sig_chld() 1682 1683 self.watcher.add_child_handler(591, callback1) 1684 self.watcher.add_child_handler(592, callback2) 1685 1686 callback1.assert_called_once_with(591, 7) 1687 self.assertFalse(callback2.called) 1688 1689 @waitpid_mocks 1690 def test_set_loop(self, m): 1691 # register a child 1692 callback = mock.Mock() 1693 1694 with self.watcher: 1695 self.running = True 1696 self.watcher.add_child_handler(60, callback) 1697 1698 # attach a new loop 1699 old_loop = self.loop 1700 self.loop = self.new_test_loop() 1701 patch = mock.patch.object 1702 1703 with patch(old_loop, "remove_signal_handler") as m_old_remove, \ 1704 patch(self.loop, "add_signal_handler") as m_new_add: 1705 1706 self.watcher.attach_loop(self.loop) 1707 1708 m_old_remove.assert_called_once_with( 1709 signal.SIGCHLD) 1710 m_new_add.assert_called_once_with( 1711 signal.SIGCHLD, self.watcher._sig_chld) 1712 1713 # child terminates 1714 self.running = False 1715 self.add_zombie(60, 9) 1716 self.watcher._sig_chld() 1717 1718 callback.assert_called_once_with(60, 9) 1719 1720 @waitpid_mocks 1721 def test_set_loop_race_condition(self, m): 1722 # register 3 children 1723 callback1 = mock.Mock() 1724 callback2 = mock.Mock() 1725 callback3 = mock.Mock() 1726 1727 with self.watcher: 1728 self.running = True 1729 self.watcher.add_child_handler(61, callback1) 1730 self.watcher.add_child_handler(62, callback2) 1731 self.watcher.add_child_handler(622, callback3) 1732 1733 # detach the loop 1734 old_loop = self.loop 1735 self.loop = None 1736 1737 with mock.patch.object( 1738 old_loop, "remove_signal_handler") as m_remove_signal_handler: 1739 1740 with self.assertWarnsRegex( 1741 RuntimeWarning, 'A loop is being detached'): 1742 self.watcher.attach_loop(None) 1743 1744 m_remove_signal_handler.assert_called_once_with( 1745 signal.SIGCHLD) 1746 1747 # child 1 & 2 terminate 1748 self.add_zombie(61, 11) 1749 self.add_zombie(62, -5) 1750 1751 # SIGCHLD was not caught 1752 self.assertFalse(callback1.called) 1753 self.assertFalse(callback2.called) 1754 self.assertFalse(callback3.called) 1755 1756 # attach a new loop 1757 self.loop = self.new_test_loop() 1758 1759 with mock.patch.object( 1760 self.loop, "add_signal_handler") as m_add_signal_handler: 1761 1762 self.watcher.attach_loop(self.loop) 1763 1764 m_add_signal_handler.assert_called_once_with( 1765 signal.SIGCHLD, self.watcher._sig_chld) 1766 callback1.assert_called_once_with(61, 11) # race condition! 1767 callback2.assert_called_once_with(62, -5) # race condition! 1768 self.assertFalse(callback3.called) 1769 1770 callback1.reset_mock() 1771 callback2.reset_mock() 1772 1773 # child 3 terminates 1774 self.running = False 1775 self.add_zombie(622, 19) 1776 self.watcher._sig_chld() 1777 1778 self.assertFalse(callback1.called) 1779 self.assertFalse(callback2.called) 1780 callback3.assert_called_once_with(622, 19) 1781 1782 @waitpid_mocks 1783 def test_close(self, m): 1784 # register two children 1785 callback1 = mock.Mock() 1786 1787 with self.watcher: 1788 self.running = True 1789 # child 1 terminates 1790 self.add_zombie(63, 9) 1791 # other child terminates 1792 self.add_zombie(65, 18) 1793 self.watcher._sig_chld() 1794 1795 self.watcher.add_child_handler(63, callback1) 1796 self.watcher.add_child_handler(64, callback1) 1797 1798 self.assertEqual(len(self.watcher._callbacks), 1) 1799 if isinstance(self.watcher, asyncio.FastChildWatcher): 1800 self.assertEqual(len(self.watcher._zombies), 1) 1801 1802 with mock.patch.object( 1803 self.loop, 1804 "remove_signal_handler") as m_remove_signal_handler: 1805 1806 self.watcher.close() 1807 1808 m_remove_signal_handler.assert_called_once_with( 1809 signal.SIGCHLD) 1810 self.assertFalse(self.watcher._callbacks) 1811 if isinstance(self.watcher, asyncio.FastChildWatcher): 1812 self.assertFalse(self.watcher._zombies) 1813 1814 1815class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase): 1816 def create_watcher(self): 1817 return asyncio.SafeChildWatcher() 1818 1819 1820class FastChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase): 1821 def create_watcher(self): 1822 return asyncio.FastChildWatcher() 1823 1824 1825class PolicyTests(unittest.TestCase): 1826 1827 def create_policy(self): 1828 return asyncio.DefaultEventLoopPolicy() 1829 1830 def test_get_default_child_watcher(self): 1831 policy = self.create_policy() 1832 self.assertIsNone(policy._watcher) 1833 1834 watcher = policy.get_child_watcher() 1835 self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher) 1836 1837 self.assertIs(policy._watcher, watcher) 1838 1839 self.assertIs(watcher, policy.get_child_watcher()) 1840 1841 def test_get_child_watcher_after_set(self): 1842 policy = self.create_policy() 1843 watcher = asyncio.FastChildWatcher() 1844 1845 policy.set_child_watcher(watcher) 1846 self.assertIs(policy._watcher, watcher) 1847 self.assertIs(watcher, policy.get_child_watcher()) 1848 1849 def test_get_child_watcher_thread(self): 1850 1851 def f(): 1852 policy.set_event_loop(policy.new_event_loop()) 1853 1854 self.assertIsInstance(policy.get_event_loop(), 1855 asyncio.AbstractEventLoop) 1856 watcher = policy.get_child_watcher() 1857 1858 self.assertIsInstance(watcher, asyncio.SafeChildWatcher) 1859 self.assertIsNone(watcher._loop) 1860 1861 policy.get_event_loop().close() 1862 1863 policy = self.create_policy() 1864 policy.set_child_watcher(asyncio.SafeChildWatcher()) 1865 1866 th = threading.Thread(target=f) 1867 th.start() 1868 th.join() 1869 1870 def test_child_watcher_replace_mainloop_existing(self): 1871 policy = self.create_policy() 1872 loop = policy.get_event_loop() 1873 1874 # Explicitly setup SafeChildWatcher, 1875 # default ThreadedChildWatcher has no _loop property 1876 watcher = asyncio.SafeChildWatcher() 1877 policy.set_child_watcher(watcher) 1878 watcher.attach_loop(loop) 1879 1880 self.assertIs(watcher._loop, loop) 1881 1882 new_loop = policy.new_event_loop() 1883 policy.set_event_loop(new_loop) 1884 1885 self.assertIs(watcher._loop, new_loop) 1886 1887 policy.set_event_loop(None) 1888 1889 self.assertIs(watcher._loop, None) 1890 1891 loop.close() 1892 new_loop.close() 1893 1894 1895class TestFunctional(unittest.TestCase): 1896 1897 def setUp(self): 1898 self.loop = asyncio.new_event_loop() 1899 asyncio.set_event_loop(self.loop) 1900 1901 def tearDown(self): 1902 self.loop.close() 1903 asyncio.set_event_loop(None) 1904 1905 def test_add_reader_invalid_argument(self): 1906 def assert_raises(): 1907 return self.assertRaisesRegex(ValueError, r'Invalid file object') 1908 1909 cb = lambda: None 1910 1911 with assert_raises(): 1912 self.loop.add_reader(object(), cb) 1913 with assert_raises(): 1914 self.loop.add_writer(object(), cb) 1915 1916 with assert_raises(): 1917 self.loop.remove_reader(object()) 1918 with assert_raises(): 1919 self.loop.remove_writer(object()) 1920 1921 def test_add_reader_or_writer_transport_fd(self): 1922 def assert_raises(): 1923 return self.assertRaisesRegex( 1924 RuntimeError, 1925 r'File descriptor .* is used by transport') 1926 1927 async def runner(): 1928 tr, pr = await self.loop.create_connection( 1929 lambda: asyncio.Protocol(), sock=rsock) 1930 1931 try: 1932 cb = lambda: None 1933 1934 with assert_raises(): 1935 self.loop.add_reader(rsock, cb) 1936 with assert_raises(): 1937 self.loop.add_reader(rsock.fileno(), cb) 1938 1939 with assert_raises(): 1940 self.loop.remove_reader(rsock) 1941 with assert_raises(): 1942 self.loop.remove_reader(rsock.fileno()) 1943 1944 with assert_raises(): 1945 self.loop.add_writer(rsock, cb) 1946 with assert_raises(): 1947 self.loop.add_writer(rsock.fileno(), cb) 1948 1949 with assert_raises(): 1950 self.loop.remove_writer(rsock) 1951 with assert_raises(): 1952 self.loop.remove_writer(rsock.fileno()) 1953 1954 finally: 1955 tr.close() 1956 1957 rsock, wsock = socket.socketpair() 1958 try: 1959 self.loop.run_until_complete(runner()) 1960 finally: 1961 rsock.close() 1962 wsock.close() 1963 1964 1965if __name__ == '__main__': 1966 unittest.main() 1967