• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for unix_events.py."""
2
3import collections
4import contextlib
5import errno
6import io
7import os
8import pathlib
9import signal
10import socket
11import stat
12import sys
13import tempfile
14import threading
15import unittest
16from unittest import mock
17from test import support
18
19if sys.platform == 'win32':
20    raise unittest.SkipTest('UNIX only')
21
22
23import asyncio
24from asyncio import log
25from asyncio import unix_events
26from test.test_asyncio import utils as test_utils
27
28
29MOCK_ANY = mock.ANY
30
31
32def tearDownModule():
33    asyncio.set_event_loop_policy(None)
34
35
36def close_pipe_transport(transport):
37    # Don't call transport.close() because the event loop and the selector
38    # are mocked
39    if transport._pipe is None:
40        return
41    transport._pipe.close()
42    transport._pipe = None
43
44
45@unittest.skipUnless(signal, 'Signals are not supported')
46class SelectorEventLoopSignalTests(test_utils.TestCase):
47
48    def setUp(self):
49        super().setUp()
50        self.loop = asyncio.SelectorEventLoop()
51        self.set_event_loop(self.loop)
52
53    def test_check_signal(self):
54        self.assertRaises(
55            TypeError, self.loop._check_signal, '1')
56        self.assertRaises(
57            ValueError, self.loop._check_signal, signal.NSIG + 1)
58
59    def test_handle_signal_no_handler(self):
60        self.loop._handle_signal(signal.NSIG + 1)
61
62    def test_handle_signal_cancelled_handler(self):
63        h = asyncio.Handle(mock.Mock(), (),
64                           loop=mock.Mock())
65        h.cancel()
66        self.loop._signal_handlers[signal.NSIG + 1] = h
67        self.loop.remove_signal_handler = mock.Mock()
68        self.loop._handle_signal(signal.NSIG + 1)
69        self.loop.remove_signal_handler.assert_called_with(signal.NSIG + 1)
70
71    @mock.patch('asyncio.unix_events.signal')
72    def test_add_signal_handler_setup_error(self, m_signal):
73        m_signal.NSIG = signal.NSIG
74        m_signal.valid_signals = signal.valid_signals
75        m_signal.set_wakeup_fd.side_effect = ValueError
76
77        self.assertRaises(
78            RuntimeError,
79            self.loop.add_signal_handler,
80            signal.SIGINT, lambda: True)
81
82    @mock.patch('asyncio.unix_events.signal')
83    def test_add_signal_handler_coroutine_error(self, m_signal):
84        m_signal.NSIG = signal.NSIG
85
86        async def simple_coroutine():
87            pass
88
89        # callback must not be a coroutine function
90        coro_func = simple_coroutine
91        coro_obj = coro_func()
92        self.addCleanup(coro_obj.close)
93        for func in (coro_func, coro_obj):
94            self.assertRaisesRegex(
95                TypeError, 'coroutines cannot be used with add_signal_handler',
96                self.loop.add_signal_handler,
97                signal.SIGINT, func)
98
99    @mock.patch('asyncio.unix_events.signal')
100    def test_add_signal_handler(self, m_signal):
101        m_signal.NSIG = signal.NSIG
102        m_signal.valid_signals = signal.valid_signals
103
104        cb = lambda: True
105        self.loop.add_signal_handler(signal.SIGHUP, cb)
106        h = self.loop._signal_handlers.get(signal.SIGHUP)
107        self.assertIsInstance(h, asyncio.Handle)
108        self.assertEqual(h._callback, cb)
109
110    @mock.patch('asyncio.unix_events.signal')
111    def test_add_signal_handler_install_error(self, m_signal):
112        m_signal.NSIG = signal.NSIG
113        m_signal.valid_signals = signal.valid_signals
114
115        def set_wakeup_fd(fd):
116            if fd == -1:
117                raise ValueError()
118        m_signal.set_wakeup_fd = set_wakeup_fd
119
120        class Err(OSError):
121            errno = errno.EFAULT
122        m_signal.signal.side_effect = Err
123
124        self.assertRaises(
125            Err,
126            self.loop.add_signal_handler,
127            signal.SIGINT, lambda: True)
128
129    @mock.patch('asyncio.unix_events.signal')
130    @mock.patch('asyncio.base_events.logger')
131    def test_add_signal_handler_install_error2(self, m_logging, m_signal):
132        m_signal.NSIG = signal.NSIG
133        m_signal.valid_signals = signal.valid_signals
134
135        class Err(OSError):
136            errno = errno.EINVAL
137        m_signal.signal.side_effect = Err
138
139        self.loop._signal_handlers[signal.SIGHUP] = lambda: True
140        self.assertRaises(
141            RuntimeError,
142            self.loop.add_signal_handler,
143            signal.SIGINT, lambda: True)
144        self.assertFalse(m_logging.info.called)
145        self.assertEqual(1, m_signal.set_wakeup_fd.call_count)
146
147    @mock.patch('asyncio.unix_events.signal')
148    @mock.patch('asyncio.base_events.logger')
149    def test_add_signal_handler_install_error3(self, m_logging, m_signal):
150        class Err(OSError):
151            errno = errno.EINVAL
152        m_signal.signal.side_effect = Err
153        m_signal.NSIG = signal.NSIG
154        m_signal.valid_signals = signal.valid_signals
155
156        self.assertRaises(
157            RuntimeError,
158            self.loop.add_signal_handler,
159            signal.SIGINT, lambda: True)
160        self.assertFalse(m_logging.info.called)
161        self.assertEqual(2, m_signal.set_wakeup_fd.call_count)
162
163    @mock.patch('asyncio.unix_events.signal')
164    def test_remove_signal_handler(self, m_signal):
165        m_signal.NSIG = signal.NSIG
166        m_signal.valid_signals = signal.valid_signals
167
168        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
169
170        self.assertTrue(
171            self.loop.remove_signal_handler(signal.SIGHUP))
172        self.assertTrue(m_signal.set_wakeup_fd.called)
173        self.assertTrue(m_signal.signal.called)
174        self.assertEqual(
175            (signal.SIGHUP, m_signal.SIG_DFL), m_signal.signal.call_args[0])
176
177    @mock.patch('asyncio.unix_events.signal')
178    def test_remove_signal_handler_2(self, m_signal):
179        m_signal.NSIG = signal.NSIG
180        m_signal.SIGINT = signal.SIGINT
181        m_signal.valid_signals = signal.valid_signals
182
183        self.loop.add_signal_handler(signal.SIGINT, lambda: True)
184        self.loop._signal_handlers[signal.SIGHUP] = object()
185        m_signal.set_wakeup_fd.reset_mock()
186
187        self.assertTrue(
188            self.loop.remove_signal_handler(signal.SIGINT))
189        self.assertFalse(m_signal.set_wakeup_fd.called)
190        self.assertTrue(m_signal.signal.called)
191        self.assertEqual(
192            (signal.SIGINT, m_signal.default_int_handler),
193            m_signal.signal.call_args[0])
194
195    @mock.patch('asyncio.unix_events.signal')
196    @mock.patch('asyncio.base_events.logger')
197    def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal):
198        m_signal.NSIG = signal.NSIG
199        m_signal.valid_signals = signal.valid_signals
200        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
201
202        m_signal.set_wakeup_fd.side_effect = ValueError
203
204        self.loop.remove_signal_handler(signal.SIGHUP)
205        self.assertTrue(m_logging.info)
206
207    @mock.patch('asyncio.unix_events.signal')
208    def test_remove_signal_handler_error(self, m_signal):
209        m_signal.NSIG = signal.NSIG
210        m_signal.valid_signals = signal.valid_signals
211        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
212
213        m_signal.signal.side_effect = OSError
214
215        self.assertRaises(
216            OSError, self.loop.remove_signal_handler, signal.SIGHUP)
217
218    @mock.patch('asyncio.unix_events.signal')
219    def test_remove_signal_handler_error2(self, m_signal):
220        m_signal.NSIG = signal.NSIG
221        m_signal.valid_signals = signal.valid_signals
222        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
223
224        class Err(OSError):
225            errno = errno.EINVAL
226        m_signal.signal.side_effect = Err
227
228        self.assertRaises(
229            RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP)
230
231    @mock.patch('asyncio.unix_events.signal')
232    def test_close(self, m_signal):
233        m_signal.NSIG = signal.NSIG
234        m_signal.valid_signals = signal.valid_signals
235
236        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
237        self.loop.add_signal_handler(signal.SIGCHLD, lambda: True)
238
239        self.assertEqual(len(self.loop._signal_handlers), 2)
240
241        m_signal.set_wakeup_fd.reset_mock()
242
243        self.loop.close()
244
245        self.assertEqual(len(self.loop._signal_handlers), 0)
246        m_signal.set_wakeup_fd.assert_called_once_with(-1)
247
248    @mock.patch('asyncio.unix_events.sys')
249    @mock.patch('asyncio.unix_events.signal')
250    def test_close_on_finalizing(self, m_signal, m_sys):
251        m_signal.NSIG = signal.NSIG
252        m_signal.valid_signals = signal.valid_signals
253        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
254
255        self.assertEqual(len(self.loop._signal_handlers), 1)
256        m_sys.is_finalizing.return_value = True
257        m_signal.signal.reset_mock()
258
259        with self.assertWarnsRegex(ResourceWarning,
260                                   "skipping signal handlers removal"):
261            self.loop.close()
262
263        self.assertEqual(len(self.loop._signal_handlers), 0)
264        self.assertFalse(m_signal.signal.called)
265
266
267@unittest.skipUnless(hasattr(socket, 'AF_UNIX'),
268                     'UNIX Sockets are not supported')
269class SelectorEventLoopUnixSocketTests(test_utils.TestCase):
270
271    def setUp(self):
272        super().setUp()
273        self.loop = asyncio.SelectorEventLoop()
274        self.set_event_loop(self.loop)
275
276    @support.skip_unless_bind_unix_socket
277    def test_create_unix_server_existing_path_sock(self):
278        with test_utils.unix_socket_path() as path:
279            sock = socket.socket(socket.AF_UNIX)
280            sock.bind(path)
281            sock.listen(1)
282            sock.close()
283
284            coro = self.loop.create_unix_server(lambda: None, path)
285            srv = self.loop.run_until_complete(coro)
286            srv.close()
287            self.loop.run_until_complete(srv.wait_closed())
288
289    @support.skip_unless_bind_unix_socket
290    def test_create_unix_server_pathlib(self):
291        with test_utils.unix_socket_path() as path:
292            path = pathlib.Path(path)
293            srv_coro = self.loop.create_unix_server(lambda: None, path)
294            srv = self.loop.run_until_complete(srv_coro)
295            srv.close()
296            self.loop.run_until_complete(srv.wait_closed())
297
298    def test_create_unix_connection_pathlib(self):
299        with test_utils.unix_socket_path() as path:
300            path = pathlib.Path(path)
301            coro = self.loop.create_unix_connection(lambda: None, path)
302            with self.assertRaises(FileNotFoundError):
303                # If pathlib.Path wasn't supported, the exception would be
304                # different.
305                self.loop.run_until_complete(coro)
306
307    def test_create_unix_server_existing_path_nonsock(self):
308        with tempfile.NamedTemporaryFile() as file:
309            coro = self.loop.create_unix_server(lambda: None, file.name)
310            with self.assertRaisesRegex(OSError,
311                                        'Address.*is already in use'):
312                self.loop.run_until_complete(coro)
313
314    def test_create_unix_server_ssl_bool(self):
315        coro = self.loop.create_unix_server(lambda: None, path='spam',
316                                            ssl=True)
317        with self.assertRaisesRegex(TypeError,
318                                    'ssl argument must be an SSLContext'):
319            self.loop.run_until_complete(coro)
320
321    def test_create_unix_server_nopath_nosock(self):
322        coro = self.loop.create_unix_server(lambda: None, path=None)
323        with self.assertRaisesRegex(ValueError,
324                                    'path was not specified, and no sock'):
325            self.loop.run_until_complete(coro)
326
327    def test_create_unix_server_path_inetsock(self):
328        sock = socket.socket()
329        with sock:
330            coro = self.loop.create_unix_server(lambda: None, path=None,
331                                                sock=sock)
332            with self.assertRaisesRegex(ValueError,
333                                        'A UNIX Domain Stream.*was expected'):
334                self.loop.run_until_complete(coro)
335
336    def test_create_unix_server_path_dgram(self):
337        sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
338        with sock:
339            coro = self.loop.create_unix_server(lambda: None, path=None,
340                                                sock=sock)
341            with self.assertRaisesRegex(ValueError,
342                                        'A UNIX Domain Stream.*was expected'):
343                self.loop.run_until_complete(coro)
344
345    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
346                         'no socket.SOCK_NONBLOCK (linux only)')
347    @support.skip_unless_bind_unix_socket
348    def test_create_unix_server_path_stream_bittype(self):
349        sock = socket.socket(
350            socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
351        with tempfile.NamedTemporaryFile() as file:
352            fn = file.name
353        try:
354            with sock:
355                sock.bind(fn)
356                coro = self.loop.create_unix_server(lambda: None, path=None,
357                                                    sock=sock)
358                srv = self.loop.run_until_complete(coro)
359                srv.close()
360                self.loop.run_until_complete(srv.wait_closed())
361        finally:
362            os.unlink(fn)
363
364    def test_create_unix_server_ssl_timeout_with_plain_sock(self):
365        coro = self.loop.create_unix_server(lambda: None, path='spam',
366                                            ssl_handshake_timeout=1)
367        with self.assertRaisesRegex(
368                ValueError,
369                'ssl_handshake_timeout is only meaningful with ssl'):
370            self.loop.run_until_complete(coro)
371
372    def test_create_unix_connection_path_inetsock(self):
373        sock = socket.socket()
374        with sock:
375            coro = self.loop.create_unix_connection(lambda: None,
376                                                    sock=sock)
377            with self.assertRaisesRegex(ValueError,
378                                        'A UNIX Domain Stream.*was expected'):
379                self.loop.run_until_complete(coro)
380
381    @mock.patch('asyncio.unix_events.socket')
382    def test_create_unix_server_bind_error(self, m_socket):
383        # Ensure that the socket is closed on any bind error
384        sock = mock.Mock()
385        m_socket.socket.return_value = sock
386
387        sock.bind.side_effect = OSError
388        coro = self.loop.create_unix_server(lambda: None, path="/test")
389        with self.assertRaises(OSError):
390            self.loop.run_until_complete(coro)
391        self.assertTrue(sock.close.called)
392
393        sock.bind.side_effect = MemoryError
394        coro = self.loop.create_unix_server(lambda: None, path="/test")
395        with self.assertRaises(MemoryError):
396            self.loop.run_until_complete(coro)
397        self.assertTrue(sock.close.called)
398
399    def test_create_unix_connection_path_sock(self):
400        coro = self.loop.create_unix_connection(
401            lambda: None, os.devnull, sock=object())
402        with self.assertRaisesRegex(ValueError, 'path and sock can not be'):
403            self.loop.run_until_complete(coro)
404
405    def test_create_unix_connection_nopath_nosock(self):
406        coro = self.loop.create_unix_connection(
407            lambda: None, None)
408        with self.assertRaisesRegex(ValueError,
409                                    'no path and sock were specified'):
410            self.loop.run_until_complete(coro)
411
412    def test_create_unix_connection_nossl_serverhost(self):
413        coro = self.loop.create_unix_connection(
414            lambda: None, os.devnull, server_hostname='spam')
415        with self.assertRaisesRegex(ValueError,
416                                    'server_hostname is only meaningful'):
417            self.loop.run_until_complete(coro)
418
419    def test_create_unix_connection_ssl_noserverhost(self):
420        coro = self.loop.create_unix_connection(
421            lambda: None, os.devnull, ssl=True)
422
423        with self.assertRaisesRegex(
424            ValueError, 'you have to pass server_hostname when using ssl'):
425
426            self.loop.run_until_complete(coro)
427
428    def test_create_unix_connection_ssl_timeout_with_plain_sock(self):
429        coro = self.loop.create_unix_connection(lambda: None, path='spam',
430                                            ssl_handshake_timeout=1)
431        with self.assertRaisesRegex(
432                ValueError,
433                'ssl_handshake_timeout is only meaningful with ssl'):
434            self.loop.run_until_complete(coro)
435
436
437@unittest.skipUnless(hasattr(os, 'sendfile'),
438                     'sendfile is not supported')
439class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase):
440    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
441
442    class MyProto(asyncio.Protocol):
443
444        def __init__(self, loop):
445            self.started = False
446            self.closed = False
447            self.data = bytearray()
448            self.fut = loop.create_future()
449            self.transport = None
450            self._ready = loop.create_future()
451
452        def connection_made(self, transport):
453            self.started = True
454            self.transport = transport
455            self._ready.set_result(None)
456
457        def data_received(self, data):
458            self.data.extend(data)
459
460        def connection_lost(self, exc):
461            self.closed = True
462            self.fut.set_result(None)
463
464        async def wait_closed(self):
465            await self.fut
466
467    @classmethod
468    def setUpClass(cls):
469        with open(support.TESTFN, 'wb') as fp:
470            fp.write(cls.DATA)
471        super().setUpClass()
472
473    @classmethod
474    def tearDownClass(cls):
475        support.unlink(support.TESTFN)
476        super().tearDownClass()
477
478    def setUp(self):
479        self.loop = asyncio.new_event_loop()
480        self.set_event_loop(self.loop)
481        self.file = open(support.TESTFN, 'rb')
482        self.addCleanup(self.file.close)
483        super().setUp()
484
485    def make_socket(self, cleanup=True):
486        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
487        sock.setblocking(False)
488        sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
489        sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
490        if cleanup:
491            self.addCleanup(sock.close)
492        return sock
493
494    def run_loop(self, coro):
495        return self.loop.run_until_complete(coro)
496
497    def prepare(self):
498        sock = self.make_socket()
499        proto = self.MyProto(self.loop)
500        port = support.find_unused_port()
501        srv_sock = self.make_socket(cleanup=False)
502        srv_sock.bind((support.HOST, port))
503        server = self.run_loop(self.loop.create_server(
504            lambda: proto, sock=srv_sock))
505        self.run_loop(self.loop.sock_connect(sock, (support.HOST, port)))
506        self.run_loop(proto._ready)
507
508        def cleanup():
509            proto.transport.close()
510            self.run_loop(proto.wait_closed())
511
512            server.close()
513            self.run_loop(server.wait_closed())
514
515        self.addCleanup(cleanup)
516
517        return sock, proto
518
519    def test_sock_sendfile_not_available(self):
520        sock, proto = self.prepare()
521        with mock.patch('asyncio.unix_events.os', spec=[]):
522            with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
523                                        "os[.]sendfile[(][)] is not available"):
524                self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
525                                                              0, None))
526        self.assertEqual(self.file.tell(), 0)
527
528    def test_sock_sendfile_not_a_file(self):
529        sock, proto = self.prepare()
530        f = object()
531        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
532                                    "not a regular file"):
533            self.run_loop(self.loop._sock_sendfile_native(sock, f,
534                                                          0, None))
535        self.assertEqual(self.file.tell(), 0)
536
537    def test_sock_sendfile_iobuffer(self):
538        sock, proto = self.prepare()
539        f = io.BytesIO()
540        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
541                                    "not a regular file"):
542            self.run_loop(self.loop._sock_sendfile_native(sock, f,
543                                                          0, None))
544        self.assertEqual(self.file.tell(), 0)
545
546    def test_sock_sendfile_not_regular_file(self):
547        sock, proto = self.prepare()
548        f = mock.Mock()
549        f.fileno.return_value = -1
550        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
551                                    "not a regular file"):
552            self.run_loop(self.loop._sock_sendfile_native(sock, f,
553                                                          0, None))
554        self.assertEqual(self.file.tell(), 0)
555
556    def test_sock_sendfile_cancel1(self):
557        sock, proto = self.prepare()
558
559        fut = self.loop.create_future()
560        fileno = self.file.fileno()
561        self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
562                                             0, None, len(self.DATA), 0)
563        fut.cancel()
564        with contextlib.suppress(asyncio.CancelledError):
565            self.run_loop(fut)
566        with self.assertRaises(KeyError):
567            self.loop._selector.get_key(sock)
568
569    def test_sock_sendfile_cancel2(self):
570        sock, proto = self.prepare()
571
572        fut = self.loop.create_future()
573        fileno = self.file.fileno()
574        self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
575                                             0, None, len(self.DATA), 0)
576        fut.cancel()
577        self.loop._sock_sendfile_native_impl(fut, sock.fileno(), sock, fileno,
578                                             0, None, len(self.DATA), 0)
579        with self.assertRaises(KeyError):
580            self.loop._selector.get_key(sock)
581
582    def test_sock_sendfile_blocking_error(self):
583        sock, proto = self.prepare()
584
585        fileno = self.file.fileno()
586        fut = mock.Mock()
587        fut.cancelled.return_value = False
588        with mock.patch('os.sendfile', side_effect=BlockingIOError()):
589            self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
590                                                 0, None, len(self.DATA), 0)
591        key = self.loop._selector.get_key(sock)
592        self.assertIsNotNone(key)
593        fut.add_done_callback.assert_called_once_with(mock.ANY)
594
595    def test_sock_sendfile_os_error_first_call(self):
596        sock, proto = self.prepare()
597
598        fileno = self.file.fileno()
599        fut = self.loop.create_future()
600        with mock.patch('os.sendfile', side_effect=OSError()):
601            self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
602                                                 0, None, len(self.DATA), 0)
603        with self.assertRaises(KeyError):
604            self.loop._selector.get_key(sock)
605        exc = fut.exception()
606        self.assertIsInstance(exc, asyncio.SendfileNotAvailableError)
607        self.assertEqual(0, self.file.tell())
608
609    def test_sock_sendfile_os_error_next_call(self):
610        sock, proto = self.prepare()
611
612        fileno = self.file.fileno()
613        fut = self.loop.create_future()
614        err = OSError()
615        with mock.patch('os.sendfile', side_effect=err):
616            self.loop._sock_sendfile_native_impl(fut, sock.fileno(),
617                                                 sock, fileno,
618                                                 1000, None, len(self.DATA),
619                                                 1000)
620        with self.assertRaises(KeyError):
621            self.loop._selector.get_key(sock)
622        exc = fut.exception()
623        self.assertIs(exc, err)
624        self.assertEqual(1000, self.file.tell())
625
626    def test_sock_sendfile_exception(self):
627        sock, proto = self.prepare()
628
629        fileno = self.file.fileno()
630        fut = self.loop.create_future()
631        err = asyncio.SendfileNotAvailableError()
632        with mock.patch('os.sendfile', side_effect=err):
633            self.loop._sock_sendfile_native_impl(fut, sock.fileno(),
634                                                 sock, fileno,
635                                                 1000, None, len(self.DATA),
636                                                 1000)
637        with self.assertRaises(KeyError):
638            self.loop._selector.get_key(sock)
639        exc = fut.exception()
640        self.assertIs(exc, err)
641        self.assertEqual(1000, self.file.tell())
642
643
644class UnixReadPipeTransportTests(test_utils.TestCase):
645
646    def setUp(self):
647        super().setUp()
648        self.loop = self.new_test_loop()
649        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
650        self.pipe = mock.Mock(spec_set=io.RawIOBase)
651        self.pipe.fileno.return_value = 5
652
653        blocking_patcher = mock.patch('os.set_blocking')
654        blocking_patcher.start()
655        self.addCleanup(blocking_patcher.stop)
656
657        fstat_patcher = mock.patch('os.fstat')
658        m_fstat = fstat_patcher.start()
659        st = mock.Mock()
660        st.st_mode = stat.S_IFIFO
661        m_fstat.return_value = st
662        self.addCleanup(fstat_patcher.stop)
663
664    def read_pipe_transport(self, waiter=None):
665        transport = unix_events._UnixReadPipeTransport(self.loop, self.pipe,
666                                                       self.protocol,
667                                                       waiter=waiter)
668        self.addCleanup(close_pipe_transport, transport)
669        return transport
670
671    def test_ctor(self):
672        waiter = self.loop.create_future()
673        tr = self.read_pipe_transport(waiter=waiter)
674        self.loop.run_until_complete(waiter)
675
676        self.protocol.connection_made.assert_called_with(tr)
677        self.loop.assert_reader(5, tr._read_ready)
678        self.assertIsNone(waiter.result())
679
680    @mock.patch('os.read')
681    def test__read_ready(self, m_read):
682        tr = self.read_pipe_transport()
683        m_read.return_value = b'data'
684        tr._read_ready()
685
686        m_read.assert_called_with(5, tr.max_size)
687        self.protocol.data_received.assert_called_with(b'data')
688
689    @mock.patch('os.read')
690    def test__read_ready_eof(self, m_read):
691        tr = self.read_pipe_transport()
692        m_read.return_value = b''
693        tr._read_ready()
694
695        m_read.assert_called_with(5, tr.max_size)
696        self.assertFalse(self.loop.readers)
697        test_utils.run_briefly(self.loop)
698        self.protocol.eof_received.assert_called_with()
699        self.protocol.connection_lost.assert_called_with(None)
700
701    @mock.patch('os.read')
702    def test__read_ready_blocked(self, m_read):
703        tr = self.read_pipe_transport()
704        m_read.side_effect = BlockingIOError
705        tr._read_ready()
706
707        m_read.assert_called_with(5, tr.max_size)
708        test_utils.run_briefly(self.loop)
709        self.assertFalse(self.protocol.data_received.called)
710
711    @mock.patch('asyncio.log.logger.error')
712    @mock.patch('os.read')
713    def test__read_ready_error(self, m_read, m_logexc):
714        tr = self.read_pipe_transport()
715        err = OSError()
716        m_read.side_effect = err
717        tr._close = mock.Mock()
718        tr._read_ready()
719
720        m_read.assert_called_with(5, tr.max_size)
721        tr._close.assert_called_with(err)
722        m_logexc.assert_called_with(
723            test_utils.MockPattern(
724                'Fatal read error on pipe transport'
725                '\nprotocol:.*\ntransport:.*'),
726            exc_info=(OSError, MOCK_ANY, MOCK_ANY))
727
728    @mock.patch('os.read')
729    def test_pause_reading(self, m_read):
730        tr = self.read_pipe_transport()
731        m = mock.Mock()
732        self.loop.add_reader(5, m)
733        tr.pause_reading()
734        self.assertFalse(self.loop.readers)
735
736    @mock.patch('os.read')
737    def test_resume_reading(self, m_read):
738        tr = self.read_pipe_transport()
739        tr.pause_reading()
740        tr.resume_reading()
741        self.loop.assert_reader(5, tr._read_ready)
742
743    @mock.patch('os.read')
744    def test_close(self, m_read):
745        tr = self.read_pipe_transport()
746        tr._close = mock.Mock()
747        tr.close()
748        tr._close.assert_called_with(None)
749
750    @mock.patch('os.read')
751    def test_close_already_closing(self, m_read):
752        tr = self.read_pipe_transport()
753        tr._closing = True
754        tr._close = mock.Mock()
755        tr.close()
756        self.assertFalse(tr._close.called)
757
758    @mock.patch('os.read')
759    def test__close(self, m_read):
760        tr = self.read_pipe_transport()
761        err = object()
762        tr._close(err)
763        self.assertTrue(tr.is_closing())
764        self.assertFalse(self.loop.readers)
765        test_utils.run_briefly(self.loop)
766        self.protocol.connection_lost.assert_called_with(err)
767
768    def test__call_connection_lost(self):
769        tr = self.read_pipe_transport()
770        self.assertIsNotNone(tr._protocol)
771        self.assertIsNotNone(tr._loop)
772
773        err = None
774        tr._call_connection_lost(err)
775        self.protocol.connection_lost.assert_called_with(err)
776        self.pipe.close.assert_called_with()
777
778        self.assertIsNone(tr._protocol)
779        self.assertIsNone(tr._loop)
780
781    def test__call_connection_lost_with_err(self):
782        tr = self.read_pipe_transport()
783        self.assertIsNotNone(tr._protocol)
784        self.assertIsNotNone(tr._loop)
785
786        err = OSError()
787        tr._call_connection_lost(err)
788        self.protocol.connection_lost.assert_called_with(err)
789        self.pipe.close.assert_called_with()
790
791        self.assertIsNone(tr._protocol)
792        self.assertIsNone(tr._loop)
793
794    def test_pause_reading_on_closed_pipe(self):
795        tr = self.read_pipe_transport()
796        tr.close()
797        test_utils.run_briefly(self.loop)
798        self.assertIsNone(tr._loop)
799        tr.pause_reading()
800
801    def test_pause_reading_on_paused_pipe(self):
802        tr = self.read_pipe_transport()
803        tr.pause_reading()
804        # the second call should do nothing
805        tr.pause_reading()
806
807    def test_resume_reading_on_closed_pipe(self):
808        tr = self.read_pipe_transport()
809        tr.close()
810        test_utils.run_briefly(self.loop)
811        self.assertIsNone(tr._loop)
812        tr.resume_reading()
813
814    def test_resume_reading_on_paused_pipe(self):
815        tr = self.read_pipe_transport()
816        # the pipe is not paused
817        # resuming should do nothing
818        tr.resume_reading()
819
820
821class UnixWritePipeTransportTests(test_utils.TestCase):
822
823    def setUp(self):
824        super().setUp()
825        self.loop = self.new_test_loop()
826        self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
827        self.pipe = mock.Mock(spec_set=io.RawIOBase)
828        self.pipe.fileno.return_value = 5
829
830        blocking_patcher = mock.patch('os.set_blocking')
831        blocking_patcher.start()
832        self.addCleanup(blocking_patcher.stop)
833
834        fstat_patcher = mock.patch('os.fstat')
835        m_fstat = fstat_patcher.start()
836        st = mock.Mock()
837        st.st_mode = stat.S_IFSOCK
838        m_fstat.return_value = st
839        self.addCleanup(fstat_patcher.stop)
840
841    def write_pipe_transport(self, waiter=None):
842        transport = unix_events._UnixWritePipeTransport(self.loop, self.pipe,
843                                                        self.protocol,
844                                                        waiter=waiter)
845        self.addCleanup(close_pipe_transport, transport)
846        return transport
847
848    def test_ctor(self):
849        waiter = self.loop.create_future()
850        tr = self.write_pipe_transport(waiter=waiter)
851        self.loop.run_until_complete(waiter)
852
853        self.protocol.connection_made.assert_called_with(tr)
854        self.loop.assert_reader(5, tr._read_ready)
855        self.assertEqual(None, waiter.result())
856
857    def test_can_write_eof(self):
858        tr = self.write_pipe_transport()
859        self.assertTrue(tr.can_write_eof())
860
861    @mock.patch('os.write')
862    def test_write(self, m_write):
863        tr = self.write_pipe_transport()
864        m_write.return_value = 4
865        tr.write(b'data')
866        m_write.assert_called_with(5, b'data')
867        self.assertFalse(self.loop.writers)
868        self.assertEqual(bytearray(), tr._buffer)
869
870    @mock.patch('os.write')
871    def test_write_no_data(self, m_write):
872        tr = self.write_pipe_transport()
873        tr.write(b'')
874        self.assertFalse(m_write.called)
875        self.assertFalse(self.loop.writers)
876        self.assertEqual(bytearray(b''), tr._buffer)
877
878    @mock.patch('os.write')
879    def test_write_partial(self, m_write):
880        tr = self.write_pipe_transport()
881        m_write.return_value = 2
882        tr.write(b'data')
883        self.loop.assert_writer(5, tr._write_ready)
884        self.assertEqual(bytearray(b'ta'), tr._buffer)
885
886    @mock.patch('os.write')
887    def test_write_buffer(self, m_write):
888        tr = self.write_pipe_transport()
889        self.loop.add_writer(5, tr._write_ready)
890        tr._buffer = bytearray(b'previous')
891        tr.write(b'data')
892        self.assertFalse(m_write.called)
893        self.loop.assert_writer(5, tr._write_ready)
894        self.assertEqual(bytearray(b'previousdata'), tr._buffer)
895
896    @mock.patch('os.write')
897    def test_write_again(self, m_write):
898        tr = self.write_pipe_transport()
899        m_write.side_effect = BlockingIOError()
900        tr.write(b'data')
901        m_write.assert_called_with(5, bytearray(b'data'))
902        self.loop.assert_writer(5, tr._write_ready)
903        self.assertEqual(bytearray(b'data'), tr._buffer)
904
905    @mock.patch('asyncio.unix_events.logger')
906    @mock.patch('os.write')
907    def test_write_err(self, m_write, m_log):
908        tr = self.write_pipe_transport()
909        err = OSError()
910        m_write.side_effect = err
911        tr._fatal_error = mock.Mock()
912        tr.write(b'data')
913        m_write.assert_called_with(5, b'data')
914        self.assertFalse(self.loop.writers)
915        self.assertEqual(bytearray(), tr._buffer)
916        tr._fatal_error.assert_called_with(
917                            err,
918                            'Fatal write error on pipe transport')
919        self.assertEqual(1, tr._conn_lost)
920
921        tr.write(b'data')
922        self.assertEqual(2, tr._conn_lost)
923        tr.write(b'data')
924        tr.write(b'data')
925        tr.write(b'data')
926        tr.write(b'data')
927        # This is a bit overspecified. :-(
928        m_log.warning.assert_called_with(
929            'pipe closed by peer or os.write(pipe, data) raised exception.')
930        tr.close()
931
932    @mock.patch('os.write')
933    def test_write_close(self, m_write):
934        tr = self.write_pipe_transport()
935        tr._read_ready()  # pipe was closed by peer
936
937        tr.write(b'data')
938        self.assertEqual(tr._conn_lost, 1)
939        tr.write(b'data')
940        self.assertEqual(tr._conn_lost, 2)
941
942    def test__read_ready(self):
943        tr = self.write_pipe_transport()
944        tr._read_ready()
945        self.assertFalse(self.loop.readers)
946        self.assertFalse(self.loop.writers)
947        self.assertTrue(tr.is_closing())
948        test_utils.run_briefly(self.loop)
949        self.protocol.connection_lost.assert_called_with(None)
950
951    @mock.patch('os.write')
952    def test__write_ready(self, m_write):
953        tr = self.write_pipe_transport()
954        self.loop.add_writer(5, tr._write_ready)
955        tr._buffer = bytearray(b'data')
956        m_write.return_value = 4
957        tr._write_ready()
958        self.assertFalse(self.loop.writers)
959        self.assertEqual(bytearray(), tr._buffer)
960
961    @mock.patch('os.write')
962    def test__write_ready_partial(self, m_write):
963        tr = self.write_pipe_transport()
964        self.loop.add_writer(5, tr._write_ready)
965        tr._buffer = bytearray(b'data')
966        m_write.return_value = 3
967        tr._write_ready()
968        self.loop.assert_writer(5, tr._write_ready)
969        self.assertEqual(bytearray(b'a'), tr._buffer)
970
971    @mock.patch('os.write')
972    def test__write_ready_again(self, m_write):
973        tr = self.write_pipe_transport()
974        self.loop.add_writer(5, tr._write_ready)
975        tr._buffer = bytearray(b'data')
976        m_write.side_effect = BlockingIOError()
977        tr._write_ready()
978        m_write.assert_called_with(5, bytearray(b'data'))
979        self.loop.assert_writer(5, tr._write_ready)
980        self.assertEqual(bytearray(b'data'), tr._buffer)
981
982    @mock.patch('os.write')
983    def test__write_ready_empty(self, m_write):
984        tr = self.write_pipe_transport()
985        self.loop.add_writer(5, tr._write_ready)
986        tr._buffer = bytearray(b'data')
987        m_write.return_value = 0
988        tr._write_ready()
989        m_write.assert_called_with(5, bytearray(b'data'))
990        self.loop.assert_writer(5, tr._write_ready)
991        self.assertEqual(bytearray(b'data'), tr._buffer)
992
993    @mock.patch('asyncio.log.logger.error')
994    @mock.patch('os.write')
995    def test__write_ready_err(self, m_write, m_logexc):
996        tr = self.write_pipe_transport()
997        self.loop.add_writer(5, tr._write_ready)
998        tr._buffer = bytearray(b'data')
999        m_write.side_effect = err = OSError()
1000        tr._write_ready()
1001        self.assertFalse(self.loop.writers)
1002        self.assertFalse(self.loop.readers)
1003        self.assertEqual(bytearray(), tr._buffer)
1004        self.assertTrue(tr.is_closing())
1005        m_logexc.assert_not_called()
1006        self.assertEqual(1, tr._conn_lost)
1007        test_utils.run_briefly(self.loop)
1008        self.protocol.connection_lost.assert_called_with(err)
1009
1010    @mock.patch('os.write')
1011    def test__write_ready_closing(self, m_write):
1012        tr = self.write_pipe_transport()
1013        self.loop.add_writer(5, tr._write_ready)
1014        tr._closing = True
1015        tr._buffer = bytearray(b'data')
1016        m_write.return_value = 4
1017        tr._write_ready()
1018        self.assertFalse(self.loop.writers)
1019        self.assertFalse(self.loop.readers)
1020        self.assertEqual(bytearray(), tr._buffer)
1021        self.protocol.connection_lost.assert_called_with(None)
1022        self.pipe.close.assert_called_with()
1023
1024    @mock.patch('os.write')
1025    def test_abort(self, m_write):
1026        tr = self.write_pipe_transport()
1027        self.loop.add_writer(5, tr._write_ready)
1028        self.loop.add_reader(5, tr._read_ready)
1029        tr._buffer = [b'da', b'ta']
1030        tr.abort()
1031        self.assertFalse(m_write.called)
1032        self.assertFalse(self.loop.readers)
1033        self.assertFalse(self.loop.writers)
1034        self.assertEqual([], tr._buffer)
1035        self.assertTrue(tr.is_closing())
1036        test_utils.run_briefly(self.loop)
1037        self.protocol.connection_lost.assert_called_with(None)
1038
1039    def test__call_connection_lost(self):
1040        tr = self.write_pipe_transport()
1041        self.assertIsNotNone(tr._protocol)
1042        self.assertIsNotNone(tr._loop)
1043
1044        err = None
1045        tr._call_connection_lost(err)
1046        self.protocol.connection_lost.assert_called_with(err)
1047        self.pipe.close.assert_called_with()
1048
1049        self.assertIsNone(tr._protocol)
1050        self.assertIsNone(tr._loop)
1051
1052    def test__call_connection_lost_with_err(self):
1053        tr = self.write_pipe_transport()
1054        self.assertIsNotNone(tr._protocol)
1055        self.assertIsNotNone(tr._loop)
1056
1057        err = OSError()
1058        tr._call_connection_lost(err)
1059        self.protocol.connection_lost.assert_called_with(err)
1060        self.pipe.close.assert_called_with()
1061
1062        self.assertIsNone(tr._protocol)
1063        self.assertIsNone(tr._loop)
1064
1065    def test_close(self):
1066        tr = self.write_pipe_transport()
1067        tr.write_eof = mock.Mock()
1068        tr.close()
1069        tr.write_eof.assert_called_with()
1070
1071        # closing the transport twice must not fail
1072        tr.close()
1073
1074    def test_close_closing(self):
1075        tr = self.write_pipe_transport()
1076        tr.write_eof = mock.Mock()
1077        tr._closing = True
1078        tr.close()
1079        self.assertFalse(tr.write_eof.called)
1080
1081    def test_write_eof(self):
1082        tr = self.write_pipe_transport()
1083        tr.write_eof()
1084        self.assertTrue(tr.is_closing())
1085        self.assertFalse(self.loop.readers)
1086        test_utils.run_briefly(self.loop)
1087        self.protocol.connection_lost.assert_called_with(None)
1088
1089    def test_write_eof_pending(self):
1090        tr = self.write_pipe_transport()
1091        tr._buffer = [b'data']
1092        tr.write_eof()
1093        self.assertTrue(tr.is_closing())
1094        self.assertFalse(self.protocol.connection_lost.called)
1095
1096
1097class AbstractChildWatcherTests(unittest.TestCase):
1098
1099    def test_not_implemented(self):
1100        f = mock.Mock()
1101        watcher = asyncio.AbstractChildWatcher()
1102        self.assertRaises(
1103            NotImplementedError, watcher.add_child_handler, f, f)
1104        self.assertRaises(
1105            NotImplementedError, watcher.remove_child_handler, f)
1106        self.assertRaises(
1107            NotImplementedError, watcher.attach_loop, f)
1108        self.assertRaises(
1109            NotImplementedError, watcher.close)
1110        self.assertRaises(
1111            NotImplementedError, watcher.is_active)
1112        self.assertRaises(
1113            NotImplementedError, watcher.__enter__)
1114        self.assertRaises(
1115            NotImplementedError, watcher.__exit__, f, f, f)
1116
1117
1118class BaseChildWatcherTests(unittest.TestCase):
1119
1120    def test_not_implemented(self):
1121        f = mock.Mock()
1122        watcher = unix_events.BaseChildWatcher()
1123        self.assertRaises(
1124            NotImplementedError, watcher._do_waitpid, f)
1125
1126
1127WaitPidMocks = collections.namedtuple("WaitPidMocks",
1128                                      ("waitpid",
1129                                       "WIFEXITED",
1130                                       "WIFSIGNALED",
1131                                       "WEXITSTATUS",
1132                                       "WTERMSIG",
1133                                       ))
1134
1135
1136class ChildWatcherTestsMixin:
1137
1138    ignore_warnings = mock.patch.object(log.logger, "warning")
1139
1140    def setUp(self):
1141        super().setUp()
1142        self.loop = self.new_test_loop()
1143        self.running = False
1144        self.zombies = {}
1145
1146        with mock.patch.object(
1147                self.loop, "add_signal_handler") as self.m_add_signal_handler:
1148            self.watcher = self.create_watcher()
1149            self.watcher.attach_loop(self.loop)
1150
1151    def waitpid(self, pid, flags):
1152        if isinstance(self.watcher, asyncio.SafeChildWatcher) or pid != -1:
1153            self.assertGreater(pid, 0)
1154        try:
1155            if pid < 0:
1156                return self.zombies.popitem()
1157            else:
1158                return pid, self.zombies.pop(pid)
1159        except KeyError:
1160            pass
1161        if self.running:
1162            return 0, 0
1163        else:
1164            raise ChildProcessError()
1165
1166    def add_zombie(self, pid, returncode):
1167        self.zombies[pid] = returncode + 32768
1168
1169    def WIFEXITED(self, status):
1170        return status >= 32768
1171
1172    def WIFSIGNALED(self, status):
1173        return 32700 < status < 32768
1174
1175    def WEXITSTATUS(self, status):
1176        self.assertTrue(self.WIFEXITED(status))
1177        return status - 32768
1178
1179    def WTERMSIG(self, status):
1180        self.assertTrue(self.WIFSIGNALED(status))
1181        return 32768 - status
1182
1183    def test_create_watcher(self):
1184        self.m_add_signal_handler.assert_called_once_with(
1185            signal.SIGCHLD, self.watcher._sig_chld)
1186
1187    def waitpid_mocks(func):
1188        def wrapped_func(self):
1189            def patch(target, wrapper):
1190                return mock.patch(target, wraps=wrapper,
1191                                  new_callable=mock.Mock)
1192
1193            with patch('os.WTERMSIG', self.WTERMSIG) as m_WTERMSIG, \
1194                 patch('os.WEXITSTATUS', self.WEXITSTATUS) as m_WEXITSTATUS, \
1195                 patch('os.WIFSIGNALED', self.WIFSIGNALED) as m_WIFSIGNALED, \
1196                 patch('os.WIFEXITED', self.WIFEXITED) as m_WIFEXITED, \
1197                 patch('os.waitpid', self.waitpid) as m_waitpid:
1198                func(self, WaitPidMocks(m_waitpid,
1199                                        m_WIFEXITED, m_WIFSIGNALED,
1200                                        m_WEXITSTATUS, m_WTERMSIG,
1201                                        ))
1202        return wrapped_func
1203
1204    @waitpid_mocks
1205    def test_sigchld(self, m):
1206        # register a child
1207        callback = mock.Mock()
1208
1209        with self.watcher:
1210            self.running = True
1211            self.watcher.add_child_handler(42, callback, 9, 10, 14)
1212
1213        self.assertFalse(callback.called)
1214        self.assertFalse(m.WIFEXITED.called)
1215        self.assertFalse(m.WIFSIGNALED.called)
1216        self.assertFalse(m.WEXITSTATUS.called)
1217        self.assertFalse(m.WTERMSIG.called)
1218
1219        # child is running
1220        self.watcher._sig_chld()
1221
1222        self.assertFalse(callback.called)
1223        self.assertFalse(m.WIFEXITED.called)
1224        self.assertFalse(m.WIFSIGNALED.called)
1225        self.assertFalse(m.WEXITSTATUS.called)
1226        self.assertFalse(m.WTERMSIG.called)
1227
1228        # child terminates (returncode 12)
1229        self.running = False
1230        self.add_zombie(42, 12)
1231        self.watcher._sig_chld()
1232
1233        self.assertTrue(m.WIFEXITED.called)
1234        self.assertTrue(m.WEXITSTATUS.called)
1235        self.assertFalse(m.WTERMSIG.called)
1236        callback.assert_called_once_with(42, 12, 9, 10, 14)
1237
1238        m.WIFSIGNALED.reset_mock()
1239        m.WIFEXITED.reset_mock()
1240        m.WEXITSTATUS.reset_mock()
1241        callback.reset_mock()
1242
1243        # ensure that the child is effectively reaped
1244        self.add_zombie(42, 13)
1245        with self.ignore_warnings:
1246            self.watcher._sig_chld()
1247
1248        self.assertFalse(callback.called)
1249        self.assertFalse(m.WTERMSIG.called)
1250
1251        m.WIFSIGNALED.reset_mock()
1252        m.WIFEXITED.reset_mock()
1253        m.WEXITSTATUS.reset_mock()
1254
1255        # sigchld called again
1256        self.zombies.clear()
1257        self.watcher._sig_chld()
1258
1259        self.assertFalse(callback.called)
1260        self.assertFalse(m.WIFEXITED.called)
1261        self.assertFalse(m.WIFSIGNALED.called)
1262        self.assertFalse(m.WEXITSTATUS.called)
1263        self.assertFalse(m.WTERMSIG.called)
1264
1265    @waitpid_mocks
1266    def test_sigchld_two_children(self, m):
1267        callback1 = mock.Mock()
1268        callback2 = mock.Mock()
1269
1270        # register child 1
1271        with self.watcher:
1272            self.running = True
1273            self.watcher.add_child_handler(43, callback1, 7, 8)
1274
1275        self.assertFalse(callback1.called)
1276        self.assertFalse(callback2.called)
1277        self.assertFalse(m.WIFEXITED.called)
1278        self.assertFalse(m.WIFSIGNALED.called)
1279        self.assertFalse(m.WEXITSTATUS.called)
1280        self.assertFalse(m.WTERMSIG.called)
1281
1282        # register child 2
1283        with self.watcher:
1284            self.watcher.add_child_handler(44, callback2, 147, 18)
1285
1286        self.assertFalse(callback1.called)
1287        self.assertFalse(callback2.called)
1288        self.assertFalse(m.WIFEXITED.called)
1289        self.assertFalse(m.WIFSIGNALED.called)
1290        self.assertFalse(m.WEXITSTATUS.called)
1291        self.assertFalse(m.WTERMSIG.called)
1292
1293        # children are running
1294        self.watcher._sig_chld()
1295
1296        self.assertFalse(callback1.called)
1297        self.assertFalse(callback2.called)
1298        self.assertFalse(m.WIFEXITED.called)
1299        self.assertFalse(m.WIFSIGNALED.called)
1300        self.assertFalse(m.WEXITSTATUS.called)
1301        self.assertFalse(m.WTERMSIG.called)
1302
1303        # child 1 terminates (signal 3)
1304        self.add_zombie(43, -3)
1305        self.watcher._sig_chld()
1306
1307        callback1.assert_called_once_with(43, -3, 7, 8)
1308        self.assertFalse(callback2.called)
1309        self.assertTrue(m.WIFSIGNALED.called)
1310        self.assertFalse(m.WEXITSTATUS.called)
1311        self.assertTrue(m.WTERMSIG.called)
1312
1313        m.WIFSIGNALED.reset_mock()
1314        m.WIFEXITED.reset_mock()
1315        m.WTERMSIG.reset_mock()
1316        callback1.reset_mock()
1317
1318        # child 2 still running
1319        self.watcher._sig_chld()
1320
1321        self.assertFalse(callback1.called)
1322        self.assertFalse(callback2.called)
1323        self.assertFalse(m.WIFEXITED.called)
1324        self.assertFalse(m.WIFSIGNALED.called)
1325        self.assertFalse(m.WEXITSTATUS.called)
1326        self.assertFalse(m.WTERMSIG.called)
1327
1328        # child 2 terminates (code 108)
1329        self.add_zombie(44, 108)
1330        self.running = False
1331        self.watcher._sig_chld()
1332
1333        callback2.assert_called_once_with(44, 108, 147, 18)
1334        self.assertFalse(callback1.called)
1335        self.assertTrue(m.WIFEXITED.called)
1336        self.assertTrue(m.WEXITSTATUS.called)
1337        self.assertFalse(m.WTERMSIG.called)
1338
1339        m.WIFSIGNALED.reset_mock()
1340        m.WIFEXITED.reset_mock()
1341        m.WEXITSTATUS.reset_mock()
1342        callback2.reset_mock()
1343
1344        # ensure that the children are effectively reaped
1345        self.add_zombie(43, 14)
1346        self.add_zombie(44, 15)
1347        with self.ignore_warnings:
1348            self.watcher._sig_chld()
1349
1350        self.assertFalse(callback1.called)
1351        self.assertFalse(callback2.called)
1352        self.assertFalse(m.WTERMSIG.called)
1353
1354        m.WIFSIGNALED.reset_mock()
1355        m.WIFEXITED.reset_mock()
1356        m.WEXITSTATUS.reset_mock()
1357
1358        # sigchld called again
1359        self.zombies.clear()
1360        self.watcher._sig_chld()
1361
1362        self.assertFalse(callback1.called)
1363        self.assertFalse(callback2.called)
1364        self.assertFalse(m.WIFEXITED.called)
1365        self.assertFalse(m.WIFSIGNALED.called)
1366        self.assertFalse(m.WEXITSTATUS.called)
1367        self.assertFalse(m.WTERMSIG.called)
1368
1369    @waitpid_mocks
1370    def test_sigchld_two_children_terminating_together(self, m):
1371        callback1 = mock.Mock()
1372        callback2 = mock.Mock()
1373
1374        # register child 1
1375        with self.watcher:
1376            self.running = True
1377            self.watcher.add_child_handler(45, callback1, 17, 8)
1378
1379        self.assertFalse(callback1.called)
1380        self.assertFalse(callback2.called)
1381        self.assertFalse(m.WIFEXITED.called)
1382        self.assertFalse(m.WIFSIGNALED.called)
1383        self.assertFalse(m.WEXITSTATUS.called)
1384        self.assertFalse(m.WTERMSIG.called)
1385
1386        # register child 2
1387        with self.watcher:
1388            self.watcher.add_child_handler(46, callback2, 1147, 18)
1389
1390        self.assertFalse(callback1.called)
1391        self.assertFalse(callback2.called)
1392        self.assertFalse(m.WIFEXITED.called)
1393        self.assertFalse(m.WIFSIGNALED.called)
1394        self.assertFalse(m.WEXITSTATUS.called)
1395        self.assertFalse(m.WTERMSIG.called)
1396
1397        # children are running
1398        self.watcher._sig_chld()
1399
1400        self.assertFalse(callback1.called)
1401        self.assertFalse(callback2.called)
1402        self.assertFalse(m.WIFEXITED.called)
1403        self.assertFalse(m.WIFSIGNALED.called)
1404        self.assertFalse(m.WEXITSTATUS.called)
1405        self.assertFalse(m.WTERMSIG.called)
1406
1407        # child 1 terminates (code 78)
1408        # child 2 terminates (signal 5)
1409        self.add_zombie(45, 78)
1410        self.add_zombie(46, -5)
1411        self.running = False
1412        self.watcher._sig_chld()
1413
1414        callback1.assert_called_once_with(45, 78, 17, 8)
1415        callback2.assert_called_once_with(46, -5, 1147, 18)
1416        self.assertTrue(m.WIFSIGNALED.called)
1417        self.assertTrue(m.WIFEXITED.called)
1418        self.assertTrue(m.WEXITSTATUS.called)
1419        self.assertTrue(m.WTERMSIG.called)
1420
1421        m.WIFSIGNALED.reset_mock()
1422        m.WIFEXITED.reset_mock()
1423        m.WTERMSIG.reset_mock()
1424        m.WEXITSTATUS.reset_mock()
1425        callback1.reset_mock()
1426        callback2.reset_mock()
1427
1428        # ensure that the children are effectively reaped
1429        self.add_zombie(45, 14)
1430        self.add_zombie(46, 15)
1431        with self.ignore_warnings:
1432            self.watcher._sig_chld()
1433
1434        self.assertFalse(callback1.called)
1435        self.assertFalse(callback2.called)
1436        self.assertFalse(m.WTERMSIG.called)
1437
1438    @waitpid_mocks
1439    def test_sigchld_race_condition(self, m):
1440        # register a child
1441        callback = mock.Mock()
1442
1443        with self.watcher:
1444            # child terminates before being registered
1445            self.add_zombie(50, 4)
1446            self.watcher._sig_chld()
1447
1448            self.watcher.add_child_handler(50, callback, 1, 12)
1449
1450        callback.assert_called_once_with(50, 4, 1, 12)
1451        callback.reset_mock()
1452
1453        # ensure that the child is effectively reaped
1454        self.add_zombie(50, -1)
1455        with self.ignore_warnings:
1456            self.watcher._sig_chld()
1457
1458        self.assertFalse(callback.called)
1459
1460    @waitpid_mocks
1461    def test_sigchld_replace_handler(self, m):
1462        callback1 = mock.Mock()
1463        callback2 = mock.Mock()
1464
1465        # register a child
1466        with self.watcher:
1467            self.running = True
1468            self.watcher.add_child_handler(51, callback1, 19)
1469
1470        self.assertFalse(callback1.called)
1471        self.assertFalse(callback2.called)
1472        self.assertFalse(m.WIFEXITED.called)
1473        self.assertFalse(m.WIFSIGNALED.called)
1474        self.assertFalse(m.WEXITSTATUS.called)
1475        self.assertFalse(m.WTERMSIG.called)
1476
1477        # register the same child again
1478        with self.watcher:
1479            self.watcher.add_child_handler(51, callback2, 21)
1480
1481        self.assertFalse(callback1.called)
1482        self.assertFalse(callback2.called)
1483        self.assertFalse(m.WIFEXITED.called)
1484        self.assertFalse(m.WIFSIGNALED.called)
1485        self.assertFalse(m.WEXITSTATUS.called)
1486        self.assertFalse(m.WTERMSIG.called)
1487
1488        # child terminates (signal 8)
1489        self.running = False
1490        self.add_zombie(51, -8)
1491        self.watcher._sig_chld()
1492
1493        callback2.assert_called_once_with(51, -8, 21)
1494        self.assertFalse(callback1.called)
1495        self.assertTrue(m.WIFSIGNALED.called)
1496        self.assertFalse(m.WEXITSTATUS.called)
1497        self.assertTrue(m.WTERMSIG.called)
1498
1499        m.WIFSIGNALED.reset_mock()
1500        m.WIFEXITED.reset_mock()
1501        m.WTERMSIG.reset_mock()
1502        callback2.reset_mock()
1503
1504        # ensure that the child is effectively reaped
1505        self.add_zombie(51, 13)
1506        with self.ignore_warnings:
1507            self.watcher._sig_chld()
1508
1509        self.assertFalse(callback1.called)
1510        self.assertFalse(callback2.called)
1511        self.assertFalse(m.WTERMSIG.called)
1512
1513    @waitpid_mocks
1514    def test_sigchld_remove_handler(self, m):
1515        callback = mock.Mock()
1516
1517        # register a child
1518        with self.watcher:
1519            self.running = True
1520            self.watcher.add_child_handler(52, callback, 1984)
1521
1522        self.assertFalse(callback.called)
1523        self.assertFalse(m.WIFEXITED.called)
1524        self.assertFalse(m.WIFSIGNALED.called)
1525        self.assertFalse(m.WEXITSTATUS.called)
1526        self.assertFalse(m.WTERMSIG.called)
1527
1528        # unregister the child
1529        self.watcher.remove_child_handler(52)
1530
1531        self.assertFalse(callback.called)
1532        self.assertFalse(m.WIFEXITED.called)
1533        self.assertFalse(m.WIFSIGNALED.called)
1534        self.assertFalse(m.WEXITSTATUS.called)
1535        self.assertFalse(m.WTERMSIG.called)
1536
1537        # child terminates (code 99)
1538        self.running = False
1539        self.add_zombie(52, 99)
1540        with self.ignore_warnings:
1541            self.watcher._sig_chld()
1542
1543        self.assertFalse(callback.called)
1544
1545    @waitpid_mocks
1546    def test_sigchld_unknown_status(self, m):
1547        callback = mock.Mock()
1548
1549        # register a child
1550        with self.watcher:
1551            self.running = True
1552            self.watcher.add_child_handler(53, callback, -19)
1553
1554        self.assertFalse(callback.called)
1555        self.assertFalse(m.WIFEXITED.called)
1556        self.assertFalse(m.WIFSIGNALED.called)
1557        self.assertFalse(m.WEXITSTATUS.called)
1558        self.assertFalse(m.WTERMSIG.called)
1559
1560        # terminate with unknown status
1561        self.zombies[53] = 1178
1562        self.running = False
1563        self.watcher._sig_chld()
1564
1565        callback.assert_called_once_with(53, 1178, -19)
1566        self.assertTrue(m.WIFEXITED.called)
1567        self.assertTrue(m.WIFSIGNALED.called)
1568        self.assertFalse(m.WEXITSTATUS.called)
1569        self.assertFalse(m.WTERMSIG.called)
1570
1571        callback.reset_mock()
1572        m.WIFEXITED.reset_mock()
1573        m.WIFSIGNALED.reset_mock()
1574
1575        # ensure that the child is effectively reaped
1576        self.add_zombie(53, 101)
1577        with self.ignore_warnings:
1578            self.watcher._sig_chld()
1579
1580        self.assertFalse(callback.called)
1581
1582    @waitpid_mocks
1583    def test_remove_child_handler(self, m):
1584        callback1 = mock.Mock()
1585        callback2 = mock.Mock()
1586        callback3 = mock.Mock()
1587
1588        # register children
1589        with self.watcher:
1590            self.running = True
1591            self.watcher.add_child_handler(54, callback1, 1)
1592            self.watcher.add_child_handler(55, callback2, 2)
1593            self.watcher.add_child_handler(56, callback3, 3)
1594
1595        # remove child handler 1
1596        self.assertTrue(self.watcher.remove_child_handler(54))
1597
1598        # remove child handler 2 multiple times
1599        self.assertTrue(self.watcher.remove_child_handler(55))
1600        self.assertFalse(self.watcher.remove_child_handler(55))
1601        self.assertFalse(self.watcher.remove_child_handler(55))
1602
1603        # all children terminate
1604        self.add_zombie(54, 0)
1605        self.add_zombie(55, 1)
1606        self.add_zombie(56, 2)
1607        self.running = False
1608        with self.ignore_warnings:
1609            self.watcher._sig_chld()
1610
1611        self.assertFalse(callback1.called)
1612        self.assertFalse(callback2.called)
1613        callback3.assert_called_once_with(56, 2, 3)
1614
1615    @waitpid_mocks
1616    def test_sigchld_unhandled_exception(self, m):
1617        callback = mock.Mock()
1618
1619        # register a child
1620        with self.watcher:
1621            self.running = True
1622            self.watcher.add_child_handler(57, callback)
1623
1624        # raise an exception
1625        m.waitpid.side_effect = ValueError
1626
1627        with mock.patch.object(log.logger,
1628                               'error') as m_error:
1629
1630            self.assertEqual(self.watcher._sig_chld(), None)
1631            self.assertTrue(m_error.called)
1632
1633    @waitpid_mocks
1634    def test_sigchld_child_reaped_elsewhere(self, m):
1635        # register a child
1636        callback = mock.Mock()
1637
1638        with self.watcher:
1639            self.running = True
1640            self.watcher.add_child_handler(58, callback)
1641
1642        self.assertFalse(callback.called)
1643        self.assertFalse(m.WIFEXITED.called)
1644        self.assertFalse(m.WIFSIGNALED.called)
1645        self.assertFalse(m.WEXITSTATUS.called)
1646        self.assertFalse(m.WTERMSIG.called)
1647
1648        # child terminates
1649        self.running = False
1650        self.add_zombie(58, 4)
1651
1652        # waitpid is called elsewhere
1653        os.waitpid(58, os.WNOHANG)
1654
1655        m.waitpid.reset_mock()
1656
1657        # sigchld
1658        with self.ignore_warnings:
1659            self.watcher._sig_chld()
1660
1661        if isinstance(self.watcher, asyncio.FastChildWatcher):
1662            # here the FastChildWatche enters a deadlock
1663            # (there is no way to prevent it)
1664            self.assertFalse(callback.called)
1665        else:
1666            callback.assert_called_once_with(58, 255)
1667
1668    @waitpid_mocks
1669    def test_sigchld_unknown_pid_during_registration(self, m):
1670        # register two children
1671        callback1 = mock.Mock()
1672        callback2 = mock.Mock()
1673
1674        with self.ignore_warnings, self.watcher:
1675            self.running = True
1676            # child 1 terminates
1677            self.add_zombie(591, 7)
1678            # an unknown child terminates
1679            self.add_zombie(593, 17)
1680
1681            self.watcher._sig_chld()
1682
1683            self.watcher.add_child_handler(591, callback1)
1684            self.watcher.add_child_handler(592, callback2)
1685
1686        callback1.assert_called_once_with(591, 7)
1687        self.assertFalse(callback2.called)
1688
1689    @waitpid_mocks
1690    def test_set_loop(self, m):
1691        # register a child
1692        callback = mock.Mock()
1693
1694        with self.watcher:
1695            self.running = True
1696            self.watcher.add_child_handler(60, callback)
1697
1698        # attach a new loop
1699        old_loop = self.loop
1700        self.loop = self.new_test_loop()
1701        patch = mock.patch.object
1702
1703        with patch(old_loop, "remove_signal_handler") as m_old_remove, \
1704             patch(self.loop, "add_signal_handler") as m_new_add:
1705
1706            self.watcher.attach_loop(self.loop)
1707
1708            m_old_remove.assert_called_once_with(
1709                signal.SIGCHLD)
1710            m_new_add.assert_called_once_with(
1711                signal.SIGCHLD, self.watcher._sig_chld)
1712
1713        # child terminates
1714        self.running = False
1715        self.add_zombie(60, 9)
1716        self.watcher._sig_chld()
1717
1718        callback.assert_called_once_with(60, 9)
1719
1720    @waitpid_mocks
1721    def test_set_loop_race_condition(self, m):
1722        # register 3 children
1723        callback1 = mock.Mock()
1724        callback2 = mock.Mock()
1725        callback3 = mock.Mock()
1726
1727        with self.watcher:
1728            self.running = True
1729            self.watcher.add_child_handler(61, callback1)
1730            self.watcher.add_child_handler(62, callback2)
1731            self.watcher.add_child_handler(622, callback3)
1732
1733        # detach the loop
1734        old_loop = self.loop
1735        self.loop = None
1736
1737        with mock.patch.object(
1738                old_loop, "remove_signal_handler") as m_remove_signal_handler:
1739
1740            with self.assertWarnsRegex(
1741                    RuntimeWarning, 'A loop is being detached'):
1742                self.watcher.attach_loop(None)
1743
1744            m_remove_signal_handler.assert_called_once_with(
1745                signal.SIGCHLD)
1746
1747        # child 1 & 2 terminate
1748        self.add_zombie(61, 11)
1749        self.add_zombie(62, -5)
1750
1751        # SIGCHLD was not caught
1752        self.assertFalse(callback1.called)
1753        self.assertFalse(callback2.called)
1754        self.assertFalse(callback3.called)
1755
1756        # attach a new loop
1757        self.loop = self.new_test_loop()
1758
1759        with mock.patch.object(
1760                self.loop, "add_signal_handler") as m_add_signal_handler:
1761
1762            self.watcher.attach_loop(self.loop)
1763
1764            m_add_signal_handler.assert_called_once_with(
1765                signal.SIGCHLD, self.watcher._sig_chld)
1766            callback1.assert_called_once_with(61, 11)  # race condition!
1767            callback2.assert_called_once_with(62, -5)  # race condition!
1768            self.assertFalse(callback3.called)
1769
1770        callback1.reset_mock()
1771        callback2.reset_mock()
1772
1773        # child 3 terminates
1774        self.running = False
1775        self.add_zombie(622, 19)
1776        self.watcher._sig_chld()
1777
1778        self.assertFalse(callback1.called)
1779        self.assertFalse(callback2.called)
1780        callback3.assert_called_once_with(622, 19)
1781
1782    @waitpid_mocks
1783    def test_close(self, m):
1784        # register two children
1785        callback1 = mock.Mock()
1786
1787        with self.watcher:
1788            self.running = True
1789            # child 1 terminates
1790            self.add_zombie(63, 9)
1791            # other child terminates
1792            self.add_zombie(65, 18)
1793            self.watcher._sig_chld()
1794
1795            self.watcher.add_child_handler(63, callback1)
1796            self.watcher.add_child_handler(64, callback1)
1797
1798            self.assertEqual(len(self.watcher._callbacks), 1)
1799            if isinstance(self.watcher, asyncio.FastChildWatcher):
1800                self.assertEqual(len(self.watcher._zombies), 1)
1801
1802            with mock.patch.object(
1803                    self.loop,
1804                    "remove_signal_handler") as m_remove_signal_handler:
1805
1806                self.watcher.close()
1807
1808                m_remove_signal_handler.assert_called_once_with(
1809                    signal.SIGCHLD)
1810                self.assertFalse(self.watcher._callbacks)
1811                if isinstance(self.watcher, asyncio.FastChildWatcher):
1812                    self.assertFalse(self.watcher._zombies)
1813
1814
1815class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
1816    def create_watcher(self):
1817        return asyncio.SafeChildWatcher()
1818
1819
1820class FastChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
1821    def create_watcher(self):
1822        return asyncio.FastChildWatcher()
1823
1824
1825class PolicyTests(unittest.TestCase):
1826
1827    def create_policy(self):
1828        return asyncio.DefaultEventLoopPolicy()
1829
1830    def test_get_default_child_watcher(self):
1831        policy = self.create_policy()
1832        self.assertIsNone(policy._watcher)
1833
1834        watcher = policy.get_child_watcher()
1835        self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher)
1836
1837        self.assertIs(policy._watcher, watcher)
1838
1839        self.assertIs(watcher, policy.get_child_watcher())
1840
1841    def test_get_child_watcher_after_set(self):
1842        policy = self.create_policy()
1843        watcher = asyncio.FastChildWatcher()
1844
1845        policy.set_child_watcher(watcher)
1846        self.assertIs(policy._watcher, watcher)
1847        self.assertIs(watcher, policy.get_child_watcher())
1848
1849    def test_get_child_watcher_thread(self):
1850
1851        def f():
1852            policy.set_event_loop(policy.new_event_loop())
1853
1854            self.assertIsInstance(policy.get_event_loop(),
1855                                  asyncio.AbstractEventLoop)
1856            watcher = policy.get_child_watcher()
1857
1858            self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
1859            self.assertIsNone(watcher._loop)
1860
1861            policy.get_event_loop().close()
1862
1863        policy = self.create_policy()
1864        policy.set_child_watcher(asyncio.SafeChildWatcher())
1865
1866        th = threading.Thread(target=f)
1867        th.start()
1868        th.join()
1869
1870    def test_child_watcher_replace_mainloop_existing(self):
1871        policy = self.create_policy()
1872        loop = policy.get_event_loop()
1873
1874        # Explicitly setup SafeChildWatcher,
1875        # default ThreadedChildWatcher has no _loop property
1876        watcher = asyncio.SafeChildWatcher()
1877        policy.set_child_watcher(watcher)
1878        watcher.attach_loop(loop)
1879
1880        self.assertIs(watcher._loop, loop)
1881
1882        new_loop = policy.new_event_loop()
1883        policy.set_event_loop(new_loop)
1884
1885        self.assertIs(watcher._loop, new_loop)
1886
1887        policy.set_event_loop(None)
1888
1889        self.assertIs(watcher._loop, None)
1890
1891        loop.close()
1892        new_loop.close()
1893
1894
1895class TestFunctional(unittest.TestCase):
1896
1897    def setUp(self):
1898        self.loop = asyncio.new_event_loop()
1899        asyncio.set_event_loop(self.loop)
1900
1901    def tearDown(self):
1902        self.loop.close()
1903        asyncio.set_event_loop(None)
1904
1905    def test_add_reader_invalid_argument(self):
1906        def assert_raises():
1907            return self.assertRaisesRegex(ValueError, r'Invalid file object')
1908
1909        cb = lambda: None
1910
1911        with assert_raises():
1912            self.loop.add_reader(object(), cb)
1913        with assert_raises():
1914            self.loop.add_writer(object(), cb)
1915
1916        with assert_raises():
1917            self.loop.remove_reader(object())
1918        with assert_raises():
1919            self.loop.remove_writer(object())
1920
1921    def test_add_reader_or_writer_transport_fd(self):
1922        def assert_raises():
1923            return self.assertRaisesRegex(
1924                RuntimeError,
1925                r'File descriptor .* is used by transport')
1926
1927        async def runner():
1928            tr, pr = await self.loop.create_connection(
1929                lambda: asyncio.Protocol(), sock=rsock)
1930
1931            try:
1932                cb = lambda: None
1933
1934                with assert_raises():
1935                    self.loop.add_reader(rsock, cb)
1936                with assert_raises():
1937                    self.loop.add_reader(rsock.fileno(), cb)
1938
1939                with assert_raises():
1940                    self.loop.remove_reader(rsock)
1941                with assert_raises():
1942                    self.loop.remove_reader(rsock.fileno())
1943
1944                with assert_raises():
1945                    self.loop.add_writer(rsock, cb)
1946                with assert_raises():
1947                    self.loop.add_writer(rsock.fileno(), cb)
1948
1949                with assert_raises():
1950                    self.loop.remove_writer(rsock)
1951                with assert_raises():
1952                    self.loop.remove_writer(rsock.fileno())
1953
1954            finally:
1955                tr.close()
1956
1957        rsock, wsock = socket.socketpair()
1958        try:
1959            self.loop.run_until_complete(runner())
1960        finally:
1961            rsock.close()
1962            wsock.close()
1963
1964
1965if __name__ == '__main__':
1966    unittest.main()
1967