• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for unix_events.py."""
2
3import collections
4import contextlib
5import errno
6import io
7import os
8import pathlib
9import signal
10import socket
11import stat
12import sys
13import tempfile
14import threading
15import unittest
16from unittest import mock
17from test import support
18from test.support import socket_helper
19
20if sys.platform == 'win32':
21    raise unittest.SkipTest('UNIX only')
22
23
24import asyncio
25from asyncio import log
26from asyncio import unix_events
27from test.test_asyncio import utils as test_utils
28
29
30MOCK_ANY = mock.ANY
31
32
33def tearDownModule():
34    asyncio.set_event_loop_policy(None)
35
36
37def close_pipe_transport(transport):
38    # Don't call transport.close() because the event loop and the selector
39    # are mocked
40    if transport._pipe is None:
41        return
42    transport._pipe.close()
43    transport._pipe = None
44
45
46@unittest.skipUnless(signal, 'Signals are not supported')
47class SelectorEventLoopSignalTests(test_utils.TestCase):
48
49    def setUp(self):
50        super().setUp()
51        self.loop = asyncio.SelectorEventLoop()
52        self.set_event_loop(self.loop)
53
54    def test_check_signal(self):
55        self.assertRaises(
56            TypeError, self.loop._check_signal, '1')
57        self.assertRaises(
58            ValueError, self.loop._check_signal, signal.NSIG + 1)
59
60    def test_handle_signal_no_handler(self):
61        self.loop._handle_signal(signal.NSIG + 1)
62
63    def test_handle_signal_cancelled_handler(self):
64        h = asyncio.Handle(mock.Mock(), (),
65                           loop=mock.Mock())
66        h.cancel()
67        self.loop._signal_handlers[signal.NSIG + 1] = h
68        self.loop.remove_signal_handler = mock.Mock()
69        self.loop._handle_signal(signal.NSIG + 1)
70        self.loop.remove_signal_handler.assert_called_with(signal.NSIG + 1)
71
72    @mock.patch('asyncio.unix_events.signal')
73    def test_add_signal_handler_setup_error(self, m_signal):
74        m_signal.NSIG = signal.NSIG
75        m_signal.valid_signals = signal.valid_signals
76        m_signal.set_wakeup_fd.side_effect = ValueError
77
78        self.assertRaises(
79            RuntimeError,
80            self.loop.add_signal_handler,
81            signal.SIGINT, lambda: True)
82
83    @mock.patch('asyncio.unix_events.signal')
84    def test_add_signal_handler_coroutine_error(self, m_signal):
85        m_signal.NSIG = signal.NSIG
86
87        async def simple_coroutine():
88            pass
89
90        # callback must not be a coroutine function
91        coro_func = simple_coroutine
92        coro_obj = coro_func()
93        self.addCleanup(coro_obj.close)
94        for func in (coro_func, coro_obj):
95            self.assertRaisesRegex(
96                TypeError, 'coroutines cannot be used with add_signal_handler',
97                self.loop.add_signal_handler,
98                signal.SIGINT, func)
99
100    @mock.patch('asyncio.unix_events.signal')
101    def test_add_signal_handler(self, m_signal):
102        m_signal.NSIG = signal.NSIG
103        m_signal.valid_signals = signal.valid_signals
104
105        cb = lambda: True
106        self.loop.add_signal_handler(signal.SIGHUP, cb)
107        h = self.loop._signal_handlers.get(signal.SIGHUP)
108        self.assertIsInstance(h, asyncio.Handle)
109        self.assertEqual(h._callback, cb)
110
111    @mock.patch('asyncio.unix_events.signal')
112    def test_add_signal_handler_install_error(self, m_signal):
113        m_signal.NSIG = signal.NSIG
114        m_signal.valid_signals = signal.valid_signals
115
116        def set_wakeup_fd(fd):
117            if fd == -1:
118                raise ValueError()
119        m_signal.set_wakeup_fd = set_wakeup_fd
120
121        class Err(OSError):
122            errno = errno.EFAULT
123        m_signal.signal.side_effect = Err
124
125        self.assertRaises(
126            Err,
127            self.loop.add_signal_handler,
128            signal.SIGINT, lambda: True)
129
130    @mock.patch('asyncio.unix_events.signal')
131    @mock.patch('asyncio.base_events.logger')
132    def test_add_signal_handler_install_error2(self, m_logging, m_signal):
133        m_signal.NSIG = signal.NSIG
134        m_signal.valid_signals = signal.valid_signals
135
136        class Err(OSError):
137            errno = errno.EINVAL
138        m_signal.signal.side_effect = Err
139
140        self.loop._signal_handlers[signal.SIGHUP] = lambda: True
141        self.assertRaises(
142            RuntimeError,
143            self.loop.add_signal_handler,
144            signal.SIGINT, lambda: True)
145        self.assertFalse(m_logging.info.called)
146        self.assertEqual(1, m_signal.set_wakeup_fd.call_count)
147
148    @mock.patch('asyncio.unix_events.signal')
149    @mock.patch('asyncio.base_events.logger')
150    def test_add_signal_handler_install_error3(self, m_logging, m_signal):
151        class Err(OSError):
152            errno = errno.EINVAL
153        m_signal.signal.side_effect = Err
154        m_signal.NSIG = signal.NSIG
155        m_signal.valid_signals = signal.valid_signals
156
157        self.assertRaises(
158            RuntimeError,
159            self.loop.add_signal_handler,
160            signal.SIGINT, lambda: True)
161        self.assertFalse(m_logging.info.called)
162        self.assertEqual(2, m_signal.set_wakeup_fd.call_count)
163
164    @mock.patch('asyncio.unix_events.signal')
165    def test_remove_signal_handler(self, m_signal):
166        m_signal.NSIG = signal.NSIG
167        m_signal.valid_signals = signal.valid_signals
168
169        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
170
171        self.assertTrue(
172            self.loop.remove_signal_handler(signal.SIGHUP))
173        self.assertTrue(m_signal.set_wakeup_fd.called)
174        self.assertTrue(m_signal.signal.called)
175        self.assertEqual(
176            (signal.SIGHUP, m_signal.SIG_DFL), m_signal.signal.call_args[0])
177
178    @mock.patch('asyncio.unix_events.signal')
179    def test_remove_signal_handler_2(self, m_signal):
180        m_signal.NSIG = signal.NSIG
181        m_signal.SIGINT = signal.SIGINT
182        m_signal.valid_signals = signal.valid_signals
183
184        self.loop.add_signal_handler(signal.SIGINT, lambda: True)
185        self.loop._signal_handlers[signal.SIGHUP] = object()
186        m_signal.set_wakeup_fd.reset_mock()
187
188        self.assertTrue(
189            self.loop.remove_signal_handler(signal.SIGINT))
190        self.assertFalse(m_signal.set_wakeup_fd.called)
191        self.assertTrue(m_signal.signal.called)
192        self.assertEqual(
193            (signal.SIGINT, m_signal.default_int_handler),
194            m_signal.signal.call_args[0])
195
196    @mock.patch('asyncio.unix_events.signal')
197    @mock.patch('asyncio.base_events.logger')
198    def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal):
199        m_signal.NSIG = signal.NSIG
200        m_signal.valid_signals = signal.valid_signals
201        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
202
203        m_signal.set_wakeup_fd.side_effect = ValueError
204
205        self.loop.remove_signal_handler(signal.SIGHUP)
206        self.assertTrue(m_logging.info)
207
208    @mock.patch('asyncio.unix_events.signal')
209    def test_remove_signal_handler_error(self, m_signal):
210        m_signal.NSIG = signal.NSIG
211        m_signal.valid_signals = signal.valid_signals
212        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
213
214        m_signal.signal.side_effect = OSError
215
216        self.assertRaises(
217            OSError, self.loop.remove_signal_handler, signal.SIGHUP)
218
219    @mock.patch('asyncio.unix_events.signal')
220    def test_remove_signal_handler_error2(self, m_signal):
221        m_signal.NSIG = signal.NSIG
222        m_signal.valid_signals = signal.valid_signals
223        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
224
225        class Err(OSError):
226            errno = errno.EINVAL
227        m_signal.signal.side_effect = Err
228
229        self.assertRaises(
230            RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP)
231
232    @mock.patch('asyncio.unix_events.signal')
233    def test_close(self, m_signal):
234        m_signal.NSIG = signal.NSIG
235        m_signal.valid_signals = signal.valid_signals
236
237        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
238        self.loop.add_signal_handler(signal.SIGCHLD, lambda: True)
239
240        self.assertEqual(len(self.loop._signal_handlers), 2)
241
242        m_signal.set_wakeup_fd.reset_mock()
243
244        self.loop.close()
245
246        self.assertEqual(len(self.loop._signal_handlers), 0)
247        m_signal.set_wakeup_fd.assert_called_once_with(-1)
248
249    @mock.patch('asyncio.unix_events.sys')
250    @mock.patch('asyncio.unix_events.signal')
251    def test_close_on_finalizing(self, m_signal, m_sys):
252        m_signal.NSIG = signal.NSIG
253        m_signal.valid_signals = signal.valid_signals
254        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
255
256        self.assertEqual(len(self.loop._signal_handlers), 1)
257        m_sys.is_finalizing.return_value = True
258        m_signal.signal.reset_mock()
259
260        with self.assertWarnsRegex(ResourceWarning,
261                                   "skipping signal handlers removal"):
262            self.loop.close()
263
264        self.assertEqual(len(self.loop._signal_handlers), 0)
265        self.assertFalse(m_signal.signal.called)
266
267
268@unittest.skipUnless(hasattr(socket, 'AF_UNIX'),
269                     'UNIX Sockets are not supported')
270class SelectorEventLoopUnixSocketTests(test_utils.TestCase):
271
272    def setUp(self):
273        super().setUp()
274        self.loop = asyncio.SelectorEventLoop()
275        self.set_event_loop(self.loop)
276
277    @socket_helper.skip_unless_bind_unix_socket
278    def test_create_unix_server_existing_path_sock(self):
279        with test_utils.unix_socket_path() as path:
280            sock = socket.socket(socket.AF_UNIX)
281            sock.bind(path)
282            sock.listen(1)
283            sock.close()
284
285            coro = self.loop.create_unix_server(lambda: None, path)
286            srv = self.loop.run_until_complete(coro)
287            srv.close()
288            self.loop.run_until_complete(srv.wait_closed())
289
290    @socket_helper.skip_unless_bind_unix_socket
291    def test_create_unix_server_pathlib(self):
292        with test_utils.unix_socket_path() as path:
293            path = pathlib.Path(path)
294            srv_coro = self.loop.create_unix_server(lambda: None, path)
295            srv = self.loop.run_until_complete(srv_coro)
296            srv.close()
297            self.loop.run_until_complete(srv.wait_closed())
298
299    def test_create_unix_connection_pathlib(self):
300        with test_utils.unix_socket_path() as path:
301            path = pathlib.Path(path)
302            coro = self.loop.create_unix_connection(lambda: None, path)
303            with self.assertRaises(FileNotFoundError):
304                # If pathlib.Path wasn't supported, the exception would be
305                # different.
306                self.loop.run_until_complete(coro)
307
308    def test_create_unix_server_existing_path_nonsock(self):
309        with tempfile.NamedTemporaryFile() as file:
310            coro = self.loop.create_unix_server(lambda: None, file.name)
311            with self.assertRaisesRegex(OSError,
312                                        'Address.*is already in use'):
313                self.loop.run_until_complete(coro)
314
315    def test_create_unix_server_ssl_bool(self):
316        coro = self.loop.create_unix_server(lambda: None, path='spam',
317                                            ssl=True)
318        with self.assertRaisesRegex(TypeError,
319                                    'ssl argument must be an SSLContext'):
320            self.loop.run_until_complete(coro)
321
322    def test_create_unix_server_nopath_nosock(self):
323        coro = self.loop.create_unix_server(lambda: None, path=None)
324        with self.assertRaisesRegex(ValueError,
325                                    'path was not specified, and no sock'):
326            self.loop.run_until_complete(coro)
327
328    def test_create_unix_server_path_inetsock(self):
329        sock = socket.socket()
330        with sock:
331            coro = self.loop.create_unix_server(lambda: None, path=None,
332                                                sock=sock)
333            with self.assertRaisesRegex(ValueError,
334                                        'A UNIX Domain Stream.*was expected'):
335                self.loop.run_until_complete(coro)
336
337    def test_create_unix_server_path_dgram(self):
338        sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
339        with sock:
340            coro = self.loop.create_unix_server(lambda: None, path=None,
341                                                sock=sock)
342            with self.assertRaisesRegex(ValueError,
343                                        'A UNIX Domain Stream.*was expected'):
344                self.loop.run_until_complete(coro)
345
346    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
347                         'no socket.SOCK_NONBLOCK (linux only)')
348    @socket_helper.skip_unless_bind_unix_socket
349    def test_create_unix_server_path_stream_bittype(self):
350        sock = socket.socket(
351            socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
352        with tempfile.NamedTemporaryFile() as file:
353            fn = file.name
354        try:
355            with sock:
356                sock.bind(fn)
357                coro = self.loop.create_unix_server(lambda: None, path=None,
358                                                    sock=sock)
359                srv = self.loop.run_until_complete(coro)
360                srv.close()
361                self.loop.run_until_complete(srv.wait_closed())
362        finally:
363            os.unlink(fn)
364
365    def test_create_unix_server_ssl_timeout_with_plain_sock(self):
366        coro = self.loop.create_unix_server(lambda: None, path='spam',
367                                            ssl_handshake_timeout=1)
368        with self.assertRaisesRegex(
369                ValueError,
370                'ssl_handshake_timeout is only meaningful with ssl'):
371            self.loop.run_until_complete(coro)
372
373    def test_create_unix_connection_path_inetsock(self):
374        sock = socket.socket()
375        with sock:
376            coro = self.loop.create_unix_connection(lambda: None,
377                                                    sock=sock)
378            with self.assertRaisesRegex(ValueError,
379                                        'A UNIX Domain Stream.*was expected'):
380                self.loop.run_until_complete(coro)
381
382    @mock.patch('asyncio.unix_events.socket')
383    def test_create_unix_server_bind_error(self, m_socket):
384        # Ensure that the socket is closed on any bind error
385        sock = mock.Mock()
386        m_socket.socket.return_value = sock
387
388        sock.bind.side_effect = OSError
389        coro = self.loop.create_unix_server(lambda: None, path="/test")
390        with self.assertRaises(OSError):
391            self.loop.run_until_complete(coro)
392        self.assertTrue(sock.close.called)
393
394        sock.bind.side_effect = MemoryError
395        coro = self.loop.create_unix_server(lambda: None, path="/test")
396        with self.assertRaises(MemoryError):
397            self.loop.run_until_complete(coro)
398        self.assertTrue(sock.close.called)
399
400    def test_create_unix_connection_path_sock(self):
401        coro = self.loop.create_unix_connection(
402            lambda: None, os.devnull, sock=object())
403        with self.assertRaisesRegex(ValueError, 'path and sock can not be'):
404            self.loop.run_until_complete(coro)
405
406    def test_create_unix_connection_nopath_nosock(self):
407        coro = self.loop.create_unix_connection(
408            lambda: None, None)
409        with self.assertRaisesRegex(ValueError,
410                                    'no path and sock were specified'):
411            self.loop.run_until_complete(coro)
412
413    def test_create_unix_connection_nossl_serverhost(self):
414        coro = self.loop.create_unix_connection(
415            lambda: None, os.devnull, server_hostname='spam')
416        with self.assertRaisesRegex(ValueError,
417                                    'server_hostname is only meaningful'):
418            self.loop.run_until_complete(coro)
419
420    def test_create_unix_connection_ssl_noserverhost(self):
421        coro = self.loop.create_unix_connection(
422            lambda: None, os.devnull, ssl=True)
423
424        with self.assertRaisesRegex(
425            ValueError, 'you have to pass server_hostname when using ssl'):
426
427            self.loop.run_until_complete(coro)
428
429    def test_create_unix_connection_ssl_timeout_with_plain_sock(self):
430        coro = self.loop.create_unix_connection(lambda: None, path='spam',
431                                            ssl_handshake_timeout=1)
432        with self.assertRaisesRegex(
433                ValueError,
434                'ssl_handshake_timeout is only meaningful with ssl'):
435            self.loop.run_until_complete(coro)
436
437
438@unittest.skipUnless(hasattr(os, 'sendfile'),
439                     'sendfile is not supported')
440class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase):
441    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
442
443    class MyProto(asyncio.Protocol):
444
445        def __init__(self, loop):
446            self.started = False
447            self.closed = False
448            self.data = bytearray()
449            self.fut = loop.create_future()
450            self.transport = None
451            self._ready = loop.create_future()
452
453        def connection_made(self, transport):
454            self.started = True
455            self.transport = transport
456            self._ready.set_result(None)
457
458        def data_received(self, data):
459            self.data.extend(data)
460
461        def connection_lost(self, exc):
462            self.closed = True
463            self.fut.set_result(None)
464
465        async def wait_closed(self):
466            await self.fut
467
468    @classmethod
469    def setUpClass(cls):
470        with open(support.TESTFN, 'wb') as fp:
471            fp.write(cls.DATA)
472        super().setUpClass()
473
474    @classmethod
475    def tearDownClass(cls):
476        support.unlink(support.TESTFN)
477        super().tearDownClass()
478
479    def setUp(self):
480        self.loop = asyncio.new_event_loop()
481        self.set_event_loop(self.loop)
482        self.file = open(support.TESTFN, 'rb')
483        self.addCleanup(self.file.close)
484        super().setUp()
485
486    def make_socket(self, cleanup=True):
487        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
488        sock.setblocking(False)
489        sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
490        sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
491        if cleanup:
492            self.addCleanup(sock.close)
493        return sock
494
495    def run_loop(self, coro):
496        return self.loop.run_until_complete(coro)
497
498    def prepare(self):
499        sock = self.make_socket()
500        proto = self.MyProto(self.loop)
501        port = socket_helper.find_unused_port()
502        srv_sock = self.make_socket(cleanup=False)
503        srv_sock.bind((socket_helper.HOST, port))
504        server = self.run_loop(self.loop.create_server(
505            lambda: proto, sock=srv_sock))
506        self.run_loop(self.loop.sock_connect(sock, (socket_helper.HOST, port)))
507        self.run_loop(proto._ready)
508
509        def cleanup():
510            proto.transport.close()
511            self.run_loop(proto.wait_closed())
512
513            server.close()
514            self.run_loop(server.wait_closed())
515
516        self.addCleanup(cleanup)
517
518        return sock, proto
519
520    def test_sock_sendfile_not_available(self):
521        sock, proto = self.prepare()
522        with mock.patch('asyncio.unix_events.os', spec=[]):
523            with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
524                                        "os[.]sendfile[(][)] is not available"):
525                self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
526                                                              0, None))
527        self.assertEqual(self.file.tell(), 0)
528
529    def test_sock_sendfile_not_a_file(self):
530        sock, proto = self.prepare()
531        f = object()
532        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
533                                    "not a regular file"):
534            self.run_loop(self.loop._sock_sendfile_native(sock, f,
535                                                          0, None))
536        self.assertEqual(self.file.tell(), 0)
537
538    def test_sock_sendfile_iobuffer(self):
539        sock, proto = self.prepare()
540        f = io.BytesIO()
541        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
542                                    "not a regular file"):
543            self.run_loop(self.loop._sock_sendfile_native(sock, f,
544                                                          0, None))
545        self.assertEqual(self.file.tell(), 0)
546
547    def test_sock_sendfile_not_regular_file(self):
548        sock, proto = self.prepare()
549        f = mock.Mock()
550        f.fileno.return_value = -1
551        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
552                                    "not a regular file"):
553            self.run_loop(self.loop._sock_sendfile_native(sock, f,
554                                                          0, None))
555        self.assertEqual(self.file.tell(), 0)
556
557    def test_sock_sendfile_cancel1(self):
558        sock, proto = self.prepare()
559
560        fut = self.loop.create_future()
561        fileno = self.file.fileno()
562        self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
563                                             0, None, len(self.DATA), 0)
564        fut.cancel()
565        with contextlib.suppress(asyncio.CancelledError):
566            self.run_loop(fut)
567        with self.assertRaises(KeyError):
568            self.loop._selector.get_key(sock)
569
570    def test_sock_sendfile_cancel2(self):
571        sock, proto = self.prepare()
572
573        fut = self.loop.create_future()
574        fileno = self.file.fileno()
575        self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
576                                             0, None, len(self.DATA), 0)
577        fut.cancel()
578        self.loop._sock_sendfile_native_impl(fut, sock.fileno(), sock, fileno,
579                                             0, None, len(self.DATA), 0)
580        with self.assertRaises(KeyError):
581            self.loop._selector.get_key(sock)
582
583    def test_sock_sendfile_blocking_error(self):
584        sock, proto = self.prepare()
585
586        fileno = self.file.fileno()
587        fut = mock.Mock()
588        fut.cancelled.return_value = False
589        with mock.patch('os.sendfile', side_effect=BlockingIOError()):
590            self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
591                                                 0, None, len(self.DATA), 0)
592        key = self.loop._selector.get_key(sock)
593        self.assertIsNotNone(key)
594        fut.add_done_callback.assert_called_once_with(mock.ANY)
595
596    def test_sock_sendfile_os_error_first_call(self):
597        sock, proto = self.prepare()
598
599        fileno = self.file.fileno()
600        fut = self.loop.create_future()
601        with mock.patch('os.sendfile', side_effect=OSError()):
602            self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
603                                                 0, None, len(self.DATA), 0)
604        with self.assertRaises(KeyError):
605            self.loop._selector.get_key(sock)
606        exc = fut.exception()
607        self.assertIsInstance(exc, asyncio.SendfileNotAvailableError)
608        self.assertEqual(0, self.file.tell())
609
610    def test_sock_sendfile_os_error_next_call(self):
611        sock, proto = self.prepare()
612
613        fileno = self.file.fileno()
614        fut = self.loop.create_future()
615        err = OSError()
616        with mock.patch('os.sendfile', side_effect=err):
617            self.loop._sock_sendfile_native_impl(fut, sock.fileno(),
618                                                 sock, fileno,
619                                                 1000, None, len(self.DATA),
620                                                 1000)
621        with self.assertRaises(KeyError):
622            self.loop._selector.get_key(sock)
623        exc = fut.exception()
624        self.assertIs(exc, err)
625        self.assertEqual(1000, self.file.tell())
626
627    def test_sock_sendfile_exception(self):
628        sock, proto = self.prepare()
629
630        fileno = self.file.fileno()
631        fut = self.loop.create_future()
632        err = asyncio.SendfileNotAvailableError()
633        with mock.patch('os.sendfile', side_effect=err):
634            self.loop._sock_sendfile_native_impl(fut, sock.fileno(),
635                                                 sock, fileno,
636                                                 1000, None, len(self.DATA),
637                                                 1000)
638        with self.assertRaises(KeyError):
639            self.loop._selector.get_key(sock)
640        exc = fut.exception()
641        self.assertIs(exc, err)
642        self.assertEqual(1000, self.file.tell())
643
644
645class UnixReadPipeTransportTests(test_utils.TestCase):
646
647    def setUp(self):
648        super().setUp()
649        self.loop = self.new_test_loop()
650        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
651        self.pipe = mock.Mock(spec_set=io.RawIOBase)
652        self.pipe.fileno.return_value = 5
653
654        blocking_patcher = mock.patch('os.set_blocking')
655        blocking_patcher.start()
656        self.addCleanup(blocking_patcher.stop)
657
658        fstat_patcher = mock.patch('os.fstat')
659        m_fstat = fstat_patcher.start()
660        st = mock.Mock()
661        st.st_mode = stat.S_IFIFO
662        m_fstat.return_value = st
663        self.addCleanup(fstat_patcher.stop)
664
665    def read_pipe_transport(self, waiter=None):
666        transport = unix_events._UnixReadPipeTransport(self.loop, self.pipe,
667                                                       self.protocol,
668                                                       waiter=waiter)
669        self.addCleanup(close_pipe_transport, transport)
670        return transport
671
672    def test_ctor(self):
673        waiter = self.loop.create_future()
674        tr = self.read_pipe_transport(waiter=waiter)
675        self.loop.run_until_complete(waiter)
676
677        self.protocol.connection_made.assert_called_with(tr)
678        self.loop.assert_reader(5, tr._read_ready)
679        self.assertIsNone(waiter.result())
680
681    @mock.patch('os.read')
682    def test__read_ready(self, m_read):
683        tr = self.read_pipe_transport()
684        m_read.return_value = b'data'
685        tr._read_ready()
686
687        m_read.assert_called_with(5, tr.max_size)
688        self.protocol.data_received.assert_called_with(b'data')
689
690    @mock.patch('os.read')
691    def test__read_ready_eof(self, m_read):
692        tr = self.read_pipe_transport()
693        m_read.return_value = b''
694        tr._read_ready()
695
696        m_read.assert_called_with(5, tr.max_size)
697        self.assertFalse(self.loop.readers)
698        test_utils.run_briefly(self.loop)
699        self.protocol.eof_received.assert_called_with()
700        self.protocol.connection_lost.assert_called_with(None)
701
702    @mock.patch('os.read')
703    def test__read_ready_blocked(self, m_read):
704        tr = self.read_pipe_transport()
705        m_read.side_effect = BlockingIOError
706        tr._read_ready()
707
708        m_read.assert_called_with(5, tr.max_size)
709        test_utils.run_briefly(self.loop)
710        self.assertFalse(self.protocol.data_received.called)
711
712    @mock.patch('asyncio.log.logger.error')
713    @mock.patch('os.read')
714    def test__read_ready_error(self, m_read, m_logexc):
715        tr = self.read_pipe_transport()
716        err = OSError()
717        m_read.side_effect = err
718        tr._close = mock.Mock()
719        tr._read_ready()
720
721        m_read.assert_called_with(5, tr.max_size)
722        tr._close.assert_called_with(err)
723        m_logexc.assert_called_with(
724            test_utils.MockPattern(
725                'Fatal read error on pipe transport'
726                '\nprotocol:.*\ntransport:.*'),
727            exc_info=(OSError, MOCK_ANY, MOCK_ANY))
728
729    @mock.patch('os.read')
730    def test_pause_reading(self, m_read):
731        tr = self.read_pipe_transport()
732        m = mock.Mock()
733        self.loop.add_reader(5, m)
734        tr.pause_reading()
735        self.assertFalse(self.loop.readers)
736
737    @mock.patch('os.read')
738    def test_resume_reading(self, m_read):
739        tr = self.read_pipe_transport()
740        tr.pause_reading()
741        tr.resume_reading()
742        self.loop.assert_reader(5, tr._read_ready)
743
744    @mock.patch('os.read')
745    def test_close(self, m_read):
746        tr = self.read_pipe_transport()
747        tr._close = mock.Mock()
748        tr.close()
749        tr._close.assert_called_with(None)
750
751    @mock.patch('os.read')
752    def test_close_already_closing(self, m_read):
753        tr = self.read_pipe_transport()
754        tr._closing = True
755        tr._close = mock.Mock()
756        tr.close()
757        self.assertFalse(tr._close.called)
758
759    @mock.patch('os.read')
760    def test__close(self, m_read):
761        tr = self.read_pipe_transport()
762        err = object()
763        tr._close(err)
764        self.assertTrue(tr.is_closing())
765        self.assertFalse(self.loop.readers)
766        test_utils.run_briefly(self.loop)
767        self.protocol.connection_lost.assert_called_with(err)
768
769    def test__call_connection_lost(self):
770        tr = self.read_pipe_transport()
771        self.assertIsNotNone(tr._protocol)
772        self.assertIsNotNone(tr._loop)
773
774        err = None
775        tr._call_connection_lost(err)
776        self.protocol.connection_lost.assert_called_with(err)
777        self.pipe.close.assert_called_with()
778
779        self.assertIsNone(tr._protocol)
780        self.assertIsNone(tr._loop)
781
782    def test__call_connection_lost_with_err(self):
783        tr = self.read_pipe_transport()
784        self.assertIsNotNone(tr._protocol)
785        self.assertIsNotNone(tr._loop)
786
787        err = OSError()
788        tr._call_connection_lost(err)
789        self.protocol.connection_lost.assert_called_with(err)
790        self.pipe.close.assert_called_with()
791
792        self.assertIsNone(tr._protocol)
793        self.assertIsNone(tr._loop)
794
795    def test_pause_reading_on_closed_pipe(self):
796        tr = self.read_pipe_transport()
797        tr.close()
798        test_utils.run_briefly(self.loop)
799        self.assertIsNone(tr._loop)
800        tr.pause_reading()
801
802    def test_pause_reading_on_paused_pipe(self):
803        tr = self.read_pipe_transport()
804        tr.pause_reading()
805        # the second call should do nothing
806        tr.pause_reading()
807
808    def test_resume_reading_on_closed_pipe(self):
809        tr = self.read_pipe_transport()
810        tr.close()
811        test_utils.run_briefly(self.loop)
812        self.assertIsNone(tr._loop)
813        tr.resume_reading()
814
815    def test_resume_reading_on_paused_pipe(self):
816        tr = self.read_pipe_transport()
817        # the pipe is not paused
818        # resuming should do nothing
819        tr.resume_reading()
820
821
822class UnixWritePipeTransportTests(test_utils.TestCase):
823
824    def setUp(self):
825        super().setUp()
826        self.loop = self.new_test_loop()
827        self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
828        self.pipe = mock.Mock(spec_set=io.RawIOBase)
829        self.pipe.fileno.return_value = 5
830
831        blocking_patcher = mock.patch('os.set_blocking')
832        blocking_patcher.start()
833        self.addCleanup(blocking_patcher.stop)
834
835        fstat_patcher = mock.patch('os.fstat')
836        m_fstat = fstat_patcher.start()
837        st = mock.Mock()
838        st.st_mode = stat.S_IFSOCK
839        m_fstat.return_value = st
840        self.addCleanup(fstat_patcher.stop)
841
842    def write_pipe_transport(self, waiter=None):
843        transport = unix_events._UnixWritePipeTransport(self.loop, self.pipe,
844                                                        self.protocol,
845                                                        waiter=waiter)
846        self.addCleanup(close_pipe_transport, transport)
847        return transport
848
849    def test_ctor(self):
850        waiter = self.loop.create_future()
851        tr = self.write_pipe_transport(waiter=waiter)
852        self.loop.run_until_complete(waiter)
853
854        self.protocol.connection_made.assert_called_with(tr)
855        self.loop.assert_reader(5, tr._read_ready)
856        self.assertEqual(None, waiter.result())
857
858    def test_can_write_eof(self):
859        tr = self.write_pipe_transport()
860        self.assertTrue(tr.can_write_eof())
861
862    @mock.patch('os.write')
863    def test_write(self, m_write):
864        tr = self.write_pipe_transport()
865        m_write.return_value = 4
866        tr.write(b'data')
867        m_write.assert_called_with(5, b'data')
868        self.assertFalse(self.loop.writers)
869        self.assertEqual(bytearray(), tr._buffer)
870
871    @mock.patch('os.write')
872    def test_write_no_data(self, m_write):
873        tr = self.write_pipe_transport()
874        tr.write(b'')
875        self.assertFalse(m_write.called)
876        self.assertFalse(self.loop.writers)
877        self.assertEqual(bytearray(b''), tr._buffer)
878
879    @mock.patch('os.write')
880    def test_write_partial(self, m_write):
881        tr = self.write_pipe_transport()
882        m_write.return_value = 2
883        tr.write(b'data')
884        self.loop.assert_writer(5, tr._write_ready)
885        self.assertEqual(bytearray(b'ta'), tr._buffer)
886
887    @mock.patch('os.write')
888    def test_write_buffer(self, m_write):
889        tr = self.write_pipe_transport()
890        self.loop.add_writer(5, tr._write_ready)
891        tr._buffer = bytearray(b'previous')
892        tr.write(b'data')
893        self.assertFalse(m_write.called)
894        self.loop.assert_writer(5, tr._write_ready)
895        self.assertEqual(bytearray(b'previousdata'), tr._buffer)
896
897    @mock.patch('os.write')
898    def test_write_again(self, m_write):
899        tr = self.write_pipe_transport()
900        m_write.side_effect = BlockingIOError()
901        tr.write(b'data')
902        m_write.assert_called_with(5, bytearray(b'data'))
903        self.loop.assert_writer(5, tr._write_ready)
904        self.assertEqual(bytearray(b'data'), tr._buffer)
905
906    @mock.patch('asyncio.unix_events.logger')
907    @mock.patch('os.write')
908    def test_write_err(self, m_write, m_log):
909        tr = self.write_pipe_transport()
910        err = OSError()
911        m_write.side_effect = err
912        tr._fatal_error = mock.Mock()
913        tr.write(b'data')
914        m_write.assert_called_with(5, b'data')
915        self.assertFalse(self.loop.writers)
916        self.assertEqual(bytearray(), tr._buffer)
917        tr._fatal_error.assert_called_with(
918                            err,
919                            'Fatal write error on pipe transport')
920        self.assertEqual(1, tr._conn_lost)
921
922        tr.write(b'data')
923        self.assertEqual(2, tr._conn_lost)
924        tr.write(b'data')
925        tr.write(b'data')
926        tr.write(b'data')
927        tr.write(b'data')
928        # This is a bit overspecified. :-(
929        m_log.warning.assert_called_with(
930            'pipe closed by peer or os.write(pipe, data) raised exception.')
931        tr.close()
932
933    @mock.patch('os.write')
934    def test_write_close(self, m_write):
935        tr = self.write_pipe_transport()
936        tr._read_ready()  # pipe was closed by peer
937
938        tr.write(b'data')
939        self.assertEqual(tr._conn_lost, 1)
940        tr.write(b'data')
941        self.assertEqual(tr._conn_lost, 2)
942
943    def test__read_ready(self):
944        tr = self.write_pipe_transport()
945        tr._read_ready()
946        self.assertFalse(self.loop.readers)
947        self.assertFalse(self.loop.writers)
948        self.assertTrue(tr.is_closing())
949        test_utils.run_briefly(self.loop)
950        self.protocol.connection_lost.assert_called_with(None)
951
952    @mock.patch('os.write')
953    def test__write_ready(self, m_write):
954        tr = self.write_pipe_transport()
955        self.loop.add_writer(5, tr._write_ready)
956        tr._buffer = bytearray(b'data')
957        m_write.return_value = 4
958        tr._write_ready()
959        self.assertFalse(self.loop.writers)
960        self.assertEqual(bytearray(), tr._buffer)
961
962    @mock.patch('os.write')
963    def test__write_ready_partial(self, m_write):
964        tr = self.write_pipe_transport()
965        self.loop.add_writer(5, tr._write_ready)
966        tr._buffer = bytearray(b'data')
967        m_write.return_value = 3
968        tr._write_ready()
969        self.loop.assert_writer(5, tr._write_ready)
970        self.assertEqual(bytearray(b'a'), tr._buffer)
971
972    @mock.patch('os.write')
973    def test__write_ready_again(self, m_write):
974        tr = self.write_pipe_transport()
975        self.loop.add_writer(5, tr._write_ready)
976        tr._buffer = bytearray(b'data')
977        m_write.side_effect = BlockingIOError()
978        tr._write_ready()
979        m_write.assert_called_with(5, bytearray(b'data'))
980        self.loop.assert_writer(5, tr._write_ready)
981        self.assertEqual(bytearray(b'data'), tr._buffer)
982
983    @mock.patch('os.write')
984    def test__write_ready_empty(self, m_write):
985        tr = self.write_pipe_transport()
986        self.loop.add_writer(5, tr._write_ready)
987        tr._buffer = bytearray(b'data')
988        m_write.return_value = 0
989        tr._write_ready()
990        m_write.assert_called_with(5, bytearray(b'data'))
991        self.loop.assert_writer(5, tr._write_ready)
992        self.assertEqual(bytearray(b'data'), tr._buffer)
993
994    @mock.patch('asyncio.log.logger.error')
995    @mock.patch('os.write')
996    def test__write_ready_err(self, m_write, m_logexc):
997        tr = self.write_pipe_transport()
998        self.loop.add_writer(5, tr._write_ready)
999        tr._buffer = bytearray(b'data')
1000        m_write.side_effect = err = OSError()
1001        tr._write_ready()
1002        self.assertFalse(self.loop.writers)
1003        self.assertFalse(self.loop.readers)
1004        self.assertEqual(bytearray(), tr._buffer)
1005        self.assertTrue(tr.is_closing())
1006        m_logexc.assert_not_called()
1007        self.assertEqual(1, tr._conn_lost)
1008        test_utils.run_briefly(self.loop)
1009        self.protocol.connection_lost.assert_called_with(err)
1010
1011    @mock.patch('os.write')
1012    def test__write_ready_closing(self, m_write):
1013        tr = self.write_pipe_transport()
1014        self.loop.add_writer(5, tr._write_ready)
1015        tr._closing = True
1016        tr._buffer = bytearray(b'data')
1017        m_write.return_value = 4
1018        tr._write_ready()
1019        self.assertFalse(self.loop.writers)
1020        self.assertFalse(self.loop.readers)
1021        self.assertEqual(bytearray(), tr._buffer)
1022        self.protocol.connection_lost.assert_called_with(None)
1023        self.pipe.close.assert_called_with()
1024
1025    @mock.patch('os.write')
1026    def test_abort(self, m_write):
1027        tr = self.write_pipe_transport()
1028        self.loop.add_writer(5, tr._write_ready)
1029        self.loop.add_reader(5, tr._read_ready)
1030        tr._buffer = [b'da', b'ta']
1031        tr.abort()
1032        self.assertFalse(m_write.called)
1033        self.assertFalse(self.loop.readers)
1034        self.assertFalse(self.loop.writers)
1035        self.assertEqual([], tr._buffer)
1036        self.assertTrue(tr.is_closing())
1037        test_utils.run_briefly(self.loop)
1038        self.protocol.connection_lost.assert_called_with(None)
1039
1040    def test__call_connection_lost(self):
1041        tr = self.write_pipe_transport()
1042        self.assertIsNotNone(tr._protocol)
1043        self.assertIsNotNone(tr._loop)
1044
1045        err = None
1046        tr._call_connection_lost(err)
1047        self.protocol.connection_lost.assert_called_with(err)
1048        self.pipe.close.assert_called_with()
1049
1050        self.assertIsNone(tr._protocol)
1051        self.assertIsNone(tr._loop)
1052
1053    def test__call_connection_lost_with_err(self):
1054        tr = self.write_pipe_transport()
1055        self.assertIsNotNone(tr._protocol)
1056        self.assertIsNotNone(tr._loop)
1057
1058        err = OSError()
1059        tr._call_connection_lost(err)
1060        self.protocol.connection_lost.assert_called_with(err)
1061        self.pipe.close.assert_called_with()
1062
1063        self.assertIsNone(tr._protocol)
1064        self.assertIsNone(tr._loop)
1065
1066    def test_close(self):
1067        tr = self.write_pipe_transport()
1068        tr.write_eof = mock.Mock()
1069        tr.close()
1070        tr.write_eof.assert_called_with()
1071
1072        # closing the transport twice must not fail
1073        tr.close()
1074
1075    def test_close_closing(self):
1076        tr = self.write_pipe_transport()
1077        tr.write_eof = mock.Mock()
1078        tr._closing = True
1079        tr.close()
1080        self.assertFalse(tr.write_eof.called)
1081
1082    def test_write_eof(self):
1083        tr = self.write_pipe_transport()
1084        tr.write_eof()
1085        self.assertTrue(tr.is_closing())
1086        self.assertFalse(self.loop.readers)
1087        test_utils.run_briefly(self.loop)
1088        self.protocol.connection_lost.assert_called_with(None)
1089
1090    def test_write_eof_pending(self):
1091        tr = self.write_pipe_transport()
1092        tr._buffer = [b'data']
1093        tr.write_eof()
1094        self.assertTrue(tr.is_closing())
1095        self.assertFalse(self.protocol.connection_lost.called)
1096
1097
1098class AbstractChildWatcherTests(unittest.TestCase):
1099
1100    def test_not_implemented(self):
1101        f = mock.Mock()
1102        watcher = asyncio.AbstractChildWatcher()
1103        self.assertRaises(
1104            NotImplementedError, watcher.add_child_handler, f, f)
1105        self.assertRaises(
1106            NotImplementedError, watcher.remove_child_handler, f)
1107        self.assertRaises(
1108            NotImplementedError, watcher.attach_loop, f)
1109        self.assertRaises(
1110            NotImplementedError, watcher.close)
1111        self.assertRaises(
1112            NotImplementedError, watcher.is_active)
1113        self.assertRaises(
1114            NotImplementedError, watcher.__enter__)
1115        self.assertRaises(
1116            NotImplementedError, watcher.__exit__, f, f, f)
1117
1118
1119class BaseChildWatcherTests(unittest.TestCase):
1120
1121    def test_not_implemented(self):
1122        f = mock.Mock()
1123        watcher = unix_events.BaseChildWatcher()
1124        self.assertRaises(
1125            NotImplementedError, watcher._do_waitpid, f)
1126
1127
1128WaitPidMocks = collections.namedtuple("WaitPidMocks",
1129                                      ("waitpid",
1130                                       "WIFEXITED",
1131                                       "WIFSIGNALED",
1132                                       "WEXITSTATUS",
1133                                       "WTERMSIG",
1134                                       ))
1135
1136
1137class ChildWatcherTestsMixin:
1138
1139    ignore_warnings = mock.patch.object(log.logger, "warning")
1140
1141    def setUp(self):
1142        super().setUp()
1143        self.loop = self.new_test_loop()
1144        self.running = False
1145        self.zombies = {}
1146
1147        with mock.patch.object(
1148                self.loop, "add_signal_handler") as self.m_add_signal_handler:
1149            self.watcher = self.create_watcher()
1150            self.watcher.attach_loop(self.loop)
1151
1152    def waitpid(self, pid, flags):
1153        if isinstance(self.watcher, asyncio.SafeChildWatcher) or pid != -1:
1154            self.assertGreater(pid, 0)
1155        try:
1156            if pid < 0:
1157                return self.zombies.popitem()
1158            else:
1159                return pid, self.zombies.pop(pid)
1160        except KeyError:
1161            pass
1162        if self.running:
1163            return 0, 0
1164        else:
1165            raise ChildProcessError()
1166
1167    def add_zombie(self, pid, returncode):
1168        self.zombies[pid] = returncode + 32768
1169
1170    def WIFEXITED(self, status):
1171        return status >= 32768
1172
1173    def WIFSIGNALED(self, status):
1174        return 32700 < status < 32768
1175
1176    def WEXITSTATUS(self, status):
1177        self.assertTrue(self.WIFEXITED(status))
1178        return status - 32768
1179
1180    def WTERMSIG(self, status):
1181        self.assertTrue(self.WIFSIGNALED(status))
1182        return 32768 - status
1183
1184    def test_create_watcher(self):
1185        self.m_add_signal_handler.assert_called_once_with(
1186            signal.SIGCHLD, self.watcher._sig_chld)
1187
1188    def waitpid_mocks(func):
1189        def wrapped_func(self):
1190            def patch(target, wrapper):
1191                return mock.patch(target, wraps=wrapper,
1192                                  new_callable=mock.Mock)
1193
1194            with patch('os.WTERMSIG', self.WTERMSIG) as m_WTERMSIG, \
1195                 patch('os.WEXITSTATUS', self.WEXITSTATUS) as m_WEXITSTATUS, \
1196                 patch('os.WIFSIGNALED', self.WIFSIGNALED) as m_WIFSIGNALED, \
1197                 patch('os.WIFEXITED', self.WIFEXITED) as m_WIFEXITED, \
1198                 patch('os.waitpid', self.waitpid) as m_waitpid:
1199                func(self, WaitPidMocks(m_waitpid,
1200                                        m_WIFEXITED, m_WIFSIGNALED,
1201                                        m_WEXITSTATUS, m_WTERMSIG,
1202                                        ))
1203        return wrapped_func
1204
1205    @waitpid_mocks
1206    def test_sigchld(self, m):
1207        # register a child
1208        callback = mock.Mock()
1209
1210        with self.watcher:
1211            self.running = True
1212            self.watcher.add_child_handler(42, callback, 9, 10, 14)
1213
1214        self.assertFalse(callback.called)
1215        self.assertFalse(m.WIFEXITED.called)
1216        self.assertFalse(m.WIFSIGNALED.called)
1217        self.assertFalse(m.WEXITSTATUS.called)
1218        self.assertFalse(m.WTERMSIG.called)
1219
1220        # child is running
1221        self.watcher._sig_chld()
1222
1223        self.assertFalse(callback.called)
1224        self.assertFalse(m.WIFEXITED.called)
1225        self.assertFalse(m.WIFSIGNALED.called)
1226        self.assertFalse(m.WEXITSTATUS.called)
1227        self.assertFalse(m.WTERMSIG.called)
1228
1229        # child terminates (returncode 12)
1230        self.running = False
1231        self.add_zombie(42, 12)
1232        self.watcher._sig_chld()
1233
1234        self.assertTrue(m.WIFEXITED.called)
1235        self.assertTrue(m.WEXITSTATUS.called)
1236        self.assertFalse(m.WTERMSIG.called)
1237        callback.assert_called_once_with(42, 12, 9, 10, 14)
1238
1239        m.WIFSIGNALED.reset_mock()
1240        m.WIFEXITED.reset_mock()
1241        m.WEXITSTATUS.reset_mock()
1242        callback.reset_mock()
1243
1244        # ensure that the child is effectively reaped
1245        self.add_zombie(42, 13)
1246        with self.ignore_warnings:
1247            self.watcher._sig_chld()
1248
1249        self.assertFalse(callback.called)
1250        self.assertFalse(m.WTERMSIG.called)
1251
1252        m.WIFSIGNALED.reset_mock()
1253        m.WIFEXITED.reset_mock()
1254        m.WEXITSTATUS.reset_mock()
1255
1256        # sigchld called again
1257        self.zombies.clear()
1258        self.watcher._sig_chld()
1259
1260        self.assertFalse(callback.called)
1261        self.assertFalse(m.WIFEXITED.called)
1262        self.assertFalse(m.WIFSIGNALED.called)
1263        self.assertFalse(m.WEXITSTATUS.called)
1264        self.assertFalse(m.WTERMSIG.called)
1265
1266    @waitpid_mocks
1267    def test_sigchld_two_children(self, m):
1268        callback1 = mock.Mock()
1269        callback2 = mock.Mock()
1270
1271        # register child 1
1272        with self.watcher:
1273            self.running = True
1274            self.watcher.add_child_handler(43, callback1, 7, 8)
1275
1276        self.assertFalse(callback1.called)
1277        self.assertFalse(callback2.called)
1278        self.assertFalse(m.WIFEXITED.called)
1279        self.assertFalse(m.WIFSIGNALED.called)
1280        self.assertFalse(m.WEXITSTATUS.called)
1281        self.assertFalse(m.WTERMSIG.called)
1282
1283        # register child 2
1284        with self.watcher:
1285            self.watcher.add_child_handler(44, callback2, 147, 18)
1286
1287        self.assertFalse(callback1.called)
1288        self.assertFalse(callback2.called)
1289        self.assertFalse(m.WIFEXITED.called)
1290        self.assertFalse(m.WIFSIGNALED.called)
1291        self.assertFalse(m.WEXITSTATUS.called)
1292        self.assertFalse(m.WTERMSIG.called)
1293
1294        # children are running
1295        self.watcher._sig_chld()
1296
1297        self.assertFalse(callback1.called)
1298        self.assertFalse(callback2.called)
1299        self.assertFalse(m.WIFEXITED.called)
1300        self.assertFalse(m.WIFSIGNALED.called)
1301        self.assertFalse(m.WEXITSTATUS.called)
1302        self.assertFalse(m.WTERMSIG.called)
1303
1304        # child 1 terminates (signal 3)
1305        self.add_zombie(43, -3)
1306        self.watcher._sig_chld()
1307
1308        callback1.assert_called_once_with(43, -3, 7, 8)
1309        self.assertFalse(callback2.called)
1310        self.assertTrue(m.WIFSIGNALED.called)
1311        self.assertFalse(m.WEXITSTATUS.called)
1312        self.assertTrue(m.WTERMSIG.called)
1313
1314        m.WIFSIGNALED.reset_mock()
1315        m.WIFEXITED.reset_mock()
1316        m.WTERMSIG.reset_mock()
1317        callback1.reset_mock()
1318
1319        # child 2 still running
1320        self.watcher._sig_chld()
1321
1322        self.assertFalse(callback1.called)
1323        self.assertFalse(callback2.called)
1324        self.assertFalse(m.WIFEXITED.called)
1325        self.assertFalse(m.WIFSIGNALED.called)
1326        self.assertFalse(m.WEXITSTATUS.called)
1327        self.assertFalse(m.WTERMSIG.called)
1328
1329        # child 2 terminates (code 108)
1330        self.add_zombie(44, 108)
1331        self.running = False
1332        self.watcher._sig_chld()
1333
1334        callback2.assert_called_once_with(44, 108, 147, 18)
1335        self.assertFalse(callback1.called)
1336        self.assertTrue(m.WIFEXITED.called)
1337        self.assertTrue(m.WEXITSTATUS.called)
1338        self.assertFalse(m.WTERMSIG.called)
1339
1340        m.WIFSIGNALED.reset_mock()
1341        m.WIFEXITED.reset_mock()
1342        m.WEXITSTATUS.reset_mock()
1343        callback2.reset_mock()
1344
1345        # ensure that the children are effectively reaped
1346        self.add_zombie(43, 14)
1347        self.add_zombie(44, 15)
1348        with self.ignore_warnings:
1349            self.watcher._sig_chld()
1350
1351        self.assertFalse(callback1.called)
1352        self.assertFalse(callback2.called)
1353        self.assertFalse(m.WTERMSIG.called)
1354
1355        m.WIFSIGNALED.reset_mock()
1356        m.WIFEXITED.reset_mock()
1357        m.WEXITSTATUS.reset_mock()
1358
1359        # sigchld called again
1360        self.zombies.clear()
1361        self.watcher._sig_chld()
1362
1363        self.assertFalse(callback1.called)
1364        self.assertFalse(callback2.called)
1365        self.assertFalse(m.WIFEXITED.called)
1366        self.assertFalse(m.WIFSIGNALED.called)
1367        self.assertFalse(m.WEXITSTATUS.called)
1368        self.assertFalse(m.WTERMSIG.called)
1369
1370    @waitpid_mocks
1371    def test_sigchld_two_children_terminating_together(self, m):
1372        callback1 = mock.Mock()
1373        callback2 = mock.Mock()
1374
1375        # register child 1
1376        with self.watcher:
1377            self.running = True
1378            self.watcher.add_child_handler(45, callback1, 17, 8)
1379
1380        self.assertFalse(callback1.called)
1381        self.assertFalse(callback2.called)
1382        self.assertFalse(m.WIFEXITED.called)
1383        self.assertFalse(m.WIFSIGNALED.called)
1384        self.assertFalse(m.WEXITSTATUS.called)
1385        self.assertFalse(m.WTERMSIG.called)
1386
1387        # register child 2
1388        with self.watcher:
1389            self.watcher.add_child_handler(46, callback2, 1147, 18)
1390
1391        self.assertFalse(callback1.called)
1392        self.assertFalse(callback2.called)
1393        self.assertFalse(m.WIFEXITED.called)
1394        self.assertFalse(m.WIFSIGNALED.called)
1395        self.assertFalse(m.WEXITSTATUS.called)
1396        self.assertFalse(m.WTERMSIG.called)
1397
1398        # children are running
1399        self.watcher._sig_chld()
1400
1401        self.assertFalse(callback1.called)
1402        self.assertFalse(callback2.called)
1403        self.assertFalse(m.WIFEXITED.called)
1404        self.assertFalse(m.WIFSIGNALED.called)
1405        self.assertFalse(m.WEXITSTATUS.called)
1406        self.assertFalse(m.WTERMSIG.called)
1407
1408        # child 1 terminates (code 78)
1409        # child 2 terminates (signal 5)
1410        self.add_zombie(45, 78)
1411        self.add_zombie(46, -5)
1412        self.running = False
1413        self.watcher._sig_chld()
1414
1415        callback1.assert_called_once_with(45, 78, 17, 8)
1416        callback2.assert_called_once_with(46, -5, 1147, 18)
1417        self.assertTrue(m.WIFSIGNALED.called)
1418        self.assertTrue(m.WIFEXITED.called)
1419        self.assertTrue(m.WEXITSTATUS.called)
1420        self.assertTrue(m.WTERMSIG.called)
1421
1422        m.WIFSIGNALED.reset_mock()
1423        m.WIFEXITED.reset_mock()
1424        m.WTERMSIG.reset_mock()
1425        m.WEXITSTATUS.reset_mock()
1426        callback1.reset_mock()
1427        callback2.reset_mock()
1428
1429        # ensure that the children are effectively reaped
1430        self.add_zombie(45, 14)
1431        self.add_zombie(46, 15)
1432        with self.ignore_warnings:
1433            self.watcher._sig_chld()
1434
1435        self.assertFalse(callback1.called)
1436        self.assertFalse(callback2.called)
1437        self.assertFalse(m.WTERMSIG.called)
1438
1439    @waitpid_mocks
1440    def test_sigchld_race_condition(self, m):
1441        # register a child
1442        callback = mock.Mock()
1443
1444        with self.watcher:
1445            # child terminates before being registered
1446            self.add_zombie(50, 4)
1447            self.watcher._sig_chld()
1448
1449            self.watcher.add_child_handler(50, callback, 1, 12)
1450
1451        callback.assert_called_once_with(50, 4, 1, 12)
1452        callback.reset_mock()
1453
1454        # ensure that the child is effectively reaped
1455        self.add_zombie(50, -1)
1456        with self.ignore_warnings:
1457            self.watcher._sig_chld()
1458
1459        self.assertFalse(callback.called)
1460
1461    @waitpid_mocks
1462    def test_sigchld_replace_handler(self, m):
1463        callback1 = mock.Mock()
1464        callback2 = mock.Mock()
1465
1466        # register a child
1467        with self.watcher:
1468            self.running = True
1469            self.watcher.add_child_handler(51, callback1, 19)
1470
1471        self.assertFalse(callback1.called)
1472        self.assertFalse(callback2.called)
1473        self.assertFalse(m.WIFEXITED.called)
1474        self.assertFalse(m.WIFSIGNALED.called)
1475        self.assertFalse(m.WEXITSTATUS.called)
1476        self.assertFalse(m.WTERMSIG.called)
1477
1478        # register the same child again
1479        with self.watcher:
1480            self.watcher.add_child_handler(51, callback2, 21)
1481
1482        self.assertFalse(callback1.called)
1483        self.assertFalse(callback2.called)
1484        self.assertFalse(m.WIFEXITED.called)
1485        self.assertFalse(m.WIFSIGNALED.called)
1486        self.assertFalse(m.WEXITSTATUS.called)
1487        self.assertFalse(m.WTERMSIG.called)
1488
1489        # child terminates (signal 8)
1490        self.running = False
1491        self.add_zombie(51, -8)
1492        self.watcher._sig_chld()
1493
1494        callback2.assert_called_once_with(51, -8, 21)
1495        self.assertFalse(callback1.called)
1496        self.assertTrue(m.WIFSIGNALED.called)
1497        self.assertFalse(m.WEXITSTATUS.called)
1498        self.assertTrue(m.WTERMSIG.called)
1499
1500        m.WIFSIGNALED.reset_mock()
1501        m.WIFEXITED.reset_mock()
1502        m.WTERMSIG.reset_mock()
1503        callback2.reset_mock()
1504
1505        # ensure that the child is effectively reaped
1506        self.add_zombie(51, 13)
1507        with self.ignore_warnings:
1508            self.watcher._sig_chld()
1509
1510        self.assertFalse(callback1.called)
1511        self.assertFalse(callback2.called)
1512        self.assertFalse(m.WTERMSIG.called)
1513
1514    @waitpid_mocks
1515    def test_sigchld_remove_handler(self, m):
1516        callback = mock.Mock()
1517
1518        # register a child
1519        with self.watcher:
1520            self.running = True
1521            self.watcher.add_child_handler(52, callback, 1984)
1522
1523        self.assertFalse(callback.called)
1524        self.assertFalse(m.WIFEXITED.called)
1525        self.assertFalse(m.WIFSIGNALED.called)
1526        self.assertFalse(m.WEXITSTATUS.called)
1527        self.assertFalse(m.WTERMSIG.called)
1528
1529        # unregister the child
1530        self.watcher.remove_child_handler(52)
1531
1532        self.assertFalse(callback.called)
1533        self.assertFalse(m.WIFEXITED.called)
1534        self.assertFalse(m.WIFSIGNALED.called)
1535        self.assertFalse(m.WEXITSTATUS.called)
1536        self.assertFalse(m.WTERMSIG.called)
1537
1538        # child terminates (code 99)
1539        self.running = False
1540        self.add_zombie(52, 99)
1541        with self.ignore_warnings:
1542            self.watcher._sig_chld()
1543
1544        self.assertFalse(callback.called)
1545
1546    @waitpid_mocks
1547    def test_sigchld_unknown_status(self, m):
1548        callback = mock.Mock()
1549
1550        # register a child
1551        with self.watcher:
1552            self.running = True
1553            self.watcher.add_child_handler(53, callback, -19)
1554
1555        self.assertFalse(callback.called)
1556        self.assertFalse(m.WIFEXITED.called)
1557        self.assertFalse(m.WIFSIGNALED.called)
1558        self.assertFalse(m.WEXITSTATUS.called)
1559        self.assertFalse(m.WTERMSIG.called)
1560
1561        # terminate with unknown status
1562        self.zombies[53] = 1178
1563        self.running = False
1564        self.watcher._sig_chld()
1565
1566        callback.assert_called_once_with(53, 1178, -19)
1567        self.assertTrue(m.WIFEXITED.called)
1568        self.assertTrue(m.WIFSIGNALED.called)
1569        self.assertFalse(m.WEXITSTATUS.called)
1570        self.assertFalse(m.WTERMSIG.called)
1571
1572        callback.reset_mock()
1573        m.WIFEXITED.reset_mock()
1574        m.WIFSIGNALED.reset_mock()
1575
1576        # ensure that the child is effectively reaped
1577        self.add_zombie(53, 101)
1578        with self.ignore_warnings:
1579            self.watcher._sig_chld()
1580
1581        self.assertFalse(callback.called)
1582
1583    @waitpid_mocks
1584    def test_remove_child_handler(self, m):
1585        callback1 = mock.Mock()
1586        callback2 = mock.Mock()
1587        callback3 = mock.Mock()
1588
1589        # register children
1590        with self.watcher:
1591            self.running = True
1592            self.watcher.add_child_handler(54, callback1, 1)
1593            self.watcher.add_child_handler(55, callback2, 2)
1594            self.watcher.add_child_handler(56, callback3, 3)
1595
1596        # remove child handler 1
1597        self.assertTrue(self.watcher.remove_child_handler(54))
1598
1599        # remove child handler 2 multiple times
1600        self.assertTrue(self.watcher.remove_child_handler(55))
1601        self.assertFalse(self.watcher.remove_child_handler(55))
1602        self.assertFalse(self.watcher.remove_child_handler(55))
1603
1604        # all children terminate
1605        self.add_zombie(54, 0)
1606        self.add_zombie(55, 1)
1607        self.add_zombie(56, 2)
1608        self.running = False
1609        with self.ignore_warnings:
1610            self.watcher._sig_chld()
1611
1612        self.assertFalse(callback1.called)
1613        self.assertFalse(callback2.called)
1614        callback3.assert_called_once_with(56, 2, 3)
1615
1616    @waitpid_mocks
1617    def test_sigchld_unhandled_exception(self, m):
1618        callback = mock.Mock()
1619
1620        # register a child
1621        with self.watcher:
1622            self.running = True
1623            self.watcher.add_child_handler(57, callback)
1624
1625        # raise an exception
1626        m.waitpid.side_effect = ValueError
1627
1628        with mock.patch.object(log.logger,
1629                               'error') as m_error:
1630
1631            self.assertEqual(self.watcher._sig_chld(), None)
1632            self.assertTrue(m_error.called)
1633
1634    @waitpid_mocks
1635    def test_sigchld_child_reaped_elsewhere(self, m):
1636        # register a child
1637        callback = mock.Mock()
1638
1639        with self.watcher:
1640            self.running = True
1641            self.watcher.add_child_handler(58, callback)
1642
1643        self.assertFalse(callback.called)
1644        self.assertFalse(m.WIFEXITED.called)
1645        self.assertFalse(m.WIFSIGNALED.called)
1646        self.assertFalse(m.WEXITSTATUS.called)
1647        self.assertFalse(m.WTERMSIG.called)
1648
1649        # child terminates
1650        self.running = False
1651        self.add_zombie(58, 4)
1652
1653        # waitpid is called elsewhere
1654        os.waitpid(58, os.WNOHANG)
1655
1656        m.waitpid.reset_mock()
1657
1658        # sigchld
1659        with self.ignore_warnings:
1660            self.watcher._sig_chld()
1661
1662        if isinstance(self.watcher, asyncio.FastChildWatcher):
1663            # here the FastChildWatche enters a deadlock
1664            # (there is no way to prevent it)
1665            self.assertFalse(callback.called)
1666        else:
1667            callback.assert_called_once_with(58, 255)
1668
1669    @waitpid_mocks
1670    def test_sigchld_unknown_pid_during_registration(self, m):
1671        # register two children
1672        callback1 = mock.Mock()
1673        callback2 = mock.Mock()
1674
1675        with self.ignore_warnings, self.watcher:
1676            self.running = True
1677            # child 1 terminates
1678            self.add_zombie(591, 7)
1679            # an unknown child terminates
1680            self.add_zombie(593, 17)
1681
1682            self.watcher._sig_chld()
1683
1684            self.watcher.add_child_handler(591, callback1)
1685            self.watcher.add_child_handler(592, callback2)
1686
1687        callback1.assert_called_once_with(591, 7)
1688        self.assertFalse(callback2.called)
1689
1690    @waitpid_mocks
1691    def test_set_loop(self, m):
1692        # register a child
1693        callback = mock.Mock()
1694
1695        with self.watcher:
1696            self.running = True
1697            self.watcher.add_child_handler(60, callback)
1698
1699        # attach a new loop
1700        old_loop = self.loop
1701        self.loop = self.new_test_loop()
1702        patch = mock.patch.object
1703
1704        with patch(old_loop, "remove_signal_handler") as m_old_remove, \
1705             patch(self.loop, "add_signal_handler") as m_new_add:
1706
1707            self.watcher.attach_loop(self.loop)
1708
1709            m_old_remove.assert_called_once_with(
1710                signal.SIGCHLD)
1711            m_new_add.assert_called_once_with(
1712                signal.SIGCHLD, self.watcher._sig_chld)
1713
1714        # child terminates
1715        self.running = False
1716        self.add_zombie(60, 9)
1717        self.watcher._sig_chld()
1718
1719        callback.assert_called_once_with(60, 9)
1720
1721    @waitpid_mocks
1722    def test_set_loop_race_condition(self, m):
1723        # register 3 children
1724        callback1 = mock.Mock()
1725        callback2 = mock.Mock()
1726        callback3 = mock.Mock()
1727
1728        with self.watcher:
1729            self.running = True
1730            self.watcher.add_child_handler(61, callback1)
1731            self.watcher.add_child_handler(62, callback2)
1732            self.watcher.add_child_handler(622, callback3)
1733
1734        # detach the loop
1735        old_loop = self.loop
1736        self.loop = None
1737
1738        with mock.patch.object(
1739                old_loop, "remove_signal_handler") as m_remove_signal_handler:
1740
1741            with self.assertWarnsRegex(
1742                    RuntimeWarning, 'A loop is being detached'):
1743                self.watcher.attach_loop(None)
1744
1745            m_remove_signal_handler.assert_called_once_with(
1746                signal.SIGCHLD)
1747
1748        # child 1 & 2 terminate
1749        self.add_zombie(61, 11)
1750        self.add_zombie(62, -5)
1751
1752        # SIGCHLD was not caught
1753        self.assertFalse(callback1.called)
1754        self.assertFalse(callback2.called)
1755        self.assertFalse(callback3.called)
1756
1757        # attach a new loop
1758        self.loop = self.new_test_loop()
1759
1760        with mock.patch.object(
1761                self.loop, "add_signal_handler") as m_add_signal_handler:
1762
1763            self.watcher.attach_loop(self.loop)
1764
1765            m_add_signal_handler.assert_called_once_with(
1766                signal.SIGCHLD, self.watcher._sig_chld)
1767            callback1.assert_called_once_with(61, 11)  # race condition!
1768            callback2.assert_called_once_with(62, -5)  # race condition!
1769            self.assertFalse(callback3.called)
1770
1771        callback1.reset_mock()
1772        callback2.reset_mock()
1773
1774        # child 3 terminates
1775        self.running = False
1776        self.add_zombie(622, 19)
1777        self.watcher._sig_chld()
1778
1779        self.assertFalse(callback1.called)
1780        self.assertFalse(callback2.called)
1781        callback3.assert_called_once_with(622, 19)
1782
1783    @waitpid_mocks
1784    def test_close(self, m):
1785        # register two children
1786        callback1 = mock.Mock()
1787
1788        with self.watcher:
1789            self.running = True
1790            # child 1 terminates
1791            self.add_zombie(63, 9)
1792            # other child terminates
1793            self.add_zombie(65, 18)
1794            self.watcher._sig_chld()
1795
1796            self.watcher.add_child_handler(63, callback1)
1797            self.watcher.add_child_handler(64, callback1)
1798
1799            self.assertEqual(len(self.watcher._callbacks), 1)
1800            if isinstance(self.watcher, asyncio.FastChildWatcher):
1801                self.assertEqual(len(self.watcher._zombies), 1)
1802
1803            with mock.patch.object(
1804                    self.loop,
1805                    "remove_signal_handler") as m_remove_signal_handler:
1806
1807                self.watcher.close()
1808
1809                m_remove_signal_handler.assert_called_once_with(
1810                    signal.SIGCHLD)
1811                self.assertFalse(self.watcher._callbacks)
1812                if isinstance(self.watcher, asyncio.FastChildWatcher):
1813                    self.assertFalse(self.watcher._zombies)
1814
1815
1816class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
1817    def create_watcher(self):
1818        return asyncio.SafeChildWatcher()
1819
1820
1821class FastChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
1822    def create_watcher(self):
1823        return asyncio.FastChildWatcher()
1824
1825
1826class PolicyTests(unittest.TestCase):
1827
1828    def create_policy(self):
1829        return asyncio.DefaultEventLoopPolicy()
1830
1831    def test_get_default_child_watcher(self):
1832        policy = self.create_policy()
1833        self.assertIsNone(policy._watcher)
1834
1835        watcher = policy.get_child_watcher()
1836        self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher)
1837
1838        self.assertIs(policy._watcher, watcher)
1839
1840        self.assertIs(watcher, policy.get_child_watcher())
1841
1842    def test_get_child_watcher_after_set(self):
1843        policy = self.create_policy()
1844        watcher = asyncio.FastChildWatcher()
1845
1846        policy.set_child_watcher(watcher)
1847        self.assertIs(policy._watcher, watcher)
1848        self.assertIs(watcher, policy.get_child_watcher())
1849
1850    def test_get_child_watcher_thread(self):
1851
1852        def f():
1853            policy.set_event_loop(policy.new_event_loop())
1854
1855            self.assertIsInstance(policy.get_event_loop(),
1856                                  asyncio.AbstractEventLoop)
1857            watcher = policy.get_child_watcher()
1858
1859            self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
1860            self.assertIsNone(watcher._loop)
1861
1862            policy.get_event_loop().close()
1863
1864        policy = self.create_policy()
1865        policy.set_child_watcher(asyncio.SafeChildWatcher())
1866
1867        th = threading.Thread(target=f)
1868        th.start()
1869        th.join()
1870
1871    def test_child_watcher_replace_mainloop_existing(self):
1872        policy = self.create_policy()
1873        loop = policy.get_event_loop()
1874
1875        # Explicitly setup SafeChildWatcher,
1876        # default ThreadedChildWatcher has no _loop property
1877        watcher = asyncio.SafeChildWatcher()
1878        policy.set_child_watcher(watcher)
1879        watcher.attach_loop(loop)
1880
1881        self.assertIs(watcher._loop, loop)
1882
1883        new_loop = policy.new_event_loop()
1884        policy.set_event_loop(new_loop)
1885
1886        self.assertIs(watcher._loop, new_loop)
1887
1888        policy.set_event_loop(None)
1889
1890        self.assertIs(watcher._loop, None)
1891
1892        loop.close()
1893        new_loop.close()
1894
1895
1896class TestFunctional(unittest.TestCase):
1897
1898    def setUp(self):
1899        self.loop = asyncio.new_event_loop()
1900        asyncio.set_event_loop(self.loop)
1901
1902    def tearDown(self):
1903        self.loop.close()
1904        asyncio.set_event_loop(None)
1905
1906    def test_add_reader_invalid_argument(self):
1907        def assert_raises():
1908            return self.assertRaisesRegex(ValueError, r'Invalid file object')
1909
1910        cb = lambda: None
1911
1912        with assert_raises():
1913            self.loop.add_reader(object(), cb)
1914        with assert_raises():
1915            self.loop.add_writer(object(), cb)
1916
1917        with assert_raises():
1918            self.loop.remove_reader(object())
1919        with assert_raises():
1920            self.loop.remove_writer(object())
1921
1922    def test_add_reader_or_writer_transport_fd(self):
1923        def assert_raises():
1924            return self.assertRaisesRegex(
1925                RuntimeError,
1926                r'File descriptor .* is used by transport')
1927
1928        async def runner():
1929            tr, pr = await self.loop.create_connection(
1930                lambda: asyncio.Protocol(), sock=rsock)
1931
1932            try:
1933                cb = lambda: None
1934
1935                with assert_raises():
1936                    self.loop.add_reader(rsock, cb)
1937                with assert_raises():
1938                    self.loop.add_reader(rsock.fileno(), cb)
1939
1940                with assert_raises():
1941                    self.loop.remove_reader(rsock)
1942                with assert_raises():
1943                    self.loop.remove_reader(rsock.fileno())
1944
1945                with assert_raises():
1946                    self.loop.add_writer(rsock, cb)
1947                with assert_raises():
1948                    self.loop.add_writer(rsock.fileno(), cb)
1949
1950                with assert_raises():
1951                    self.loop.remove_writer(rsock)
1952                with assert_raises():
1953                    self.loop.remove_writer(rsock.fileno())
1954
1955            finally:
1956                tr.close()
1957
1958        rsock, wsock = socket.socketpair()
1959        try:
1960            self.loop.run_until_complete(runner())
1961        finally:
1962            rsock.close()
1963            wsock.close()
1964
1965
1966if __name__ == '__main__':
1967    unittest.main()
1968