• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for events.py."""
2
3import collections.abc
4import concurrent.futures
5import functools
6import io
7import os
8import platform
9import re
10import signal
11import socket
12try:
13    import ssl
14except ImportError:
15    ssl = None
16import subprocess
17import sys
18import threading
19import time
20import errno
21import unittest
22from unittest import mock
23import weakref
24
25if sys.platform != 'win32':
26    import tty
27
28import asyncio
29from asyncio import coroutines
30from asyncio import events
31from asyncio import proactor_events
32from asyncio import selector_events
33from test.test_asyncio import utils as test_utils
34from test import support
35
36
37def tearDownModule():
38    asyncio.set_event_loop_policy(None)
39
40
41def broken_unix_getsockname():
42    """Return True if the platform is Mac OS 10.4 or older."""
43    if sys.platform.startswith("aix"):
44        return True
45    elif sys.platform != 'darwin':
46        return False
47    version = platform.mac_ver()[0]
48    version = tuple(map(int, version.split('.')))
49    return version < (10, 5)
50
51
52def _test_get_event_loop_new_process__sub_proc():
53    async def doit():
54        return 'hello'
55
56    loop = asyncio.new_event_loop()
57    asyncio.set_event_loop(loop)
58    return loop.run_until_complete(doit())
59
60
61class CoroLike:
62    def send(self, v):
63        pass
64
65    def throw(self, *exc):
66        pass
67
68    def close(self):
69        pass
70
71    def __await__(self):
72        pass
73
74
75class MyBaseProto(asyncio.Protocol):
76    connected = None
77    done = None
78
79    def __init__(self, loop=None):
80        self.transport = None
81        self.state = 'INITIAL'
82        self.nbytes = 0
83        if loop is not None:
84            self.connected = loop.create_future()
85            self.done = loop.create_future()
86
87    def connection_made(self, transport):
88        self.transport = transport
89        assert self.state == 'INITIAL', self.state
90        self.state = 'CONNECTED'
91        if self.connected:
92            self.connected.set_result(None)
93
94    def data_received(self, data):
95        assert self.state == 'CONNECTED', self.state
96        self.nbytes += len(data)
97
98    def eof_received(self):
99        assert self.state == 'CONNECTED', self.state
100        self.state = 'EOF'
101
102    def connection_lost(self, exc):
103        assert self.state in ('CONNECTED', 'EOF'), self.state
104        self.state = 'CLOSED'
105        if self.done:
106            self.done.set_result(None)
107
108
109class MyProto(MyBaseProto):
110    def connection_made(self, transport):
111        super().connection_made(transport)
112        transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
113
114
115class MyDatagramProto(asyncio.DatagramProtocol):
116    done = None
117
118    def __init__(self, loop=None):
119        self.state = 'INITIAL'
120        self.nbytes = 0
121        if loop is not None:
122            self.done = loop.create_future()
123
124    def connection_made(self, transport):
125        self.transport = transport
126        assert self.state == 'INITIAL', self.state
127        self.state = 'INITIALIZED'
128
129    def datagram_received(self, data, addr):
130        assert self.state == 'INITIALIZED', self.state
131        self.nbytes += len(data)
132
133    def error_received(self, exc):
134        assert self.state == 'INITIALIZED', self.state
135
136    def connection_lost(self, exc):
137        assert self.state == 'INITIALIZED', self.state
138        self.state = 'CLOSED'
139        if self.done:
140            self.done.set_result(None)
141
142
143class MyReadPipeProto(asyncio.Protocol):
144    done = None
145
146    def __init__(self, loop=None):
147        self.state = ['INITIAL']
148        self.nbytes = 0
149        self.transport = None
150        if loop is not None:
151            self.done = loop.create_future()
152
153    def connection_made(self, transport):
154        self.transport = transport
155        assert self.state == ['INITIAL'], self.state
156        self.state.append('CONNECTED')
157
158    def data_received(self, data):
159        assert self.state == ['INITIAL', 'CONNECTED'], self.state
160        self.nbytes += len(data)
161
162    def eof_received(self):
163        assert self.state == ['INITIAL', 'CONNECTED'], self.state
164        self.state.append('EOF')
165
166    def connection_lost(self, exc):
167        if 'EOF' not in self.state:
168            self.state.append('EOF')  # It is okay if EOF is missed.
169        assert self.state == ['INITIAL', 'CONNECTED', 'EOF'], self.state
170        self.state.append('CLOSED')
171        if self.done:
172            self.done.set_result(None)
173
174
175class MyWritePipeProto(asyncio.BaseProtocol):
176    done = None
177
178    def __init__(self, loop=None):
179        self.state = 'INITIAL'
180        self.transport = None
181        if loop is not None:
182            self.done = loop.create_future()
183
184    def connection_made(self, transport):
185        self.transport = transport
186        assert self.state == 'INITIAL', self.state
187        self.state = 'CONNECTED'
188
189    def connection_lost(self, exc):
190        assert self.state == 'CONNECTED', self.state
191        self.state = 'CLOSED'
192        if self.done:
193            self.done.set_result(None)
194
195
196class MySubprocessProtocol(asyncio.SubprocessProtocol):
197
198    def __init__(self, loop):
199        self.state = 'INITIAL'
200        self.transport = None
201        self.connected = loop.create_future()
202        self.completed = loop.create_future()
203        self.disconnects = {fd: loop.create_future() for fd in range(3)}
204        self.data = {1: b'', 2: b''}
205        self.returncode = None
206        self.got_data = {1: asyncio.Event(loop=loop),
207                         2: asyncio.Event(loop=loop)}
208
209    def connection_made(self, transport):
210        self.transport = transport
211        assert self.state == 'INITIAL', self.state
212        self.state = 'CONNECTED'
213        self.connected.set_result(None)
214
215    def connection_lost(self, exc):
216        assert self.state == 'CONNECTED', self.state
217        self.state = 'CLOSED'
218        self.completed.set_result(None)
219
220    def pipe_data_received(self, fd, data):
221        assert self.state == 'CONNECTED', self.state
222        self.data[fd] += data
223        self.got_data[fd].set()
224
225    def pipe_connection_lost(self, fd, exc):
226        assert self.state == 'CONNECTED', self.state
227        if exc:
228            self.disconnects[fd].set_exception(exc)
229        else:
230            self.disconnects[fd].set_result(exc)
231
232    def process_exited(self):
233        assert self.state == 'CONNECTED', self.state
234        self.returncode = self.transport.get_returncode()
235
236
237class EventLoopTestsMixin:
238
239    def setUp(self):
240        super().setUp()
241        self.loop = self.create_event_loop()
242        self.set_event_loop(self.loop)
243
244    def tearDown(self):
245        # just in case if we have transport close callbacks
246        if not self.loop.is_closed():
247            test_utils.run_briefly(self.loop)
248
249        self.doCleanups()
250        support.gc_collect()
251        super().tearDown()
252
253    def test_run_until_complete_nesting(self):
254        async def coro1():
255            await asyncio.sleep(0)
256
257        async def coro2():
258            self.assertTrue(self.loop.is_running())
259            self.loop.run_until_complete(coro1())
260
261        with self.assertWarnsRegex(
262            RuntimeWarning,
263            r"coroutine \S+ was never awaited"
264        ):
265            self.assertRaises(
266                RuntimeError, self.loop.run_until_complete, coro2())
267
268    # Note: because of the default Windows timing granularity of
269    # 15.6 msec, we use fairly long sleep times here (~100 msec).
270
271    def test_run_until_complete(self):
272        t0 = self.loop.time()
273        self.loop.run_until_complete(asyncio.sleep(0.1))
274        t1 = self.loop.time()
275        self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
276
277    def test_run_until_complete_stopped(self):
278
279        async def cb():
280            self.loop.stop()
281            await asyncio.sleep(0.1)
282        task = cb()
283        self.assertRaises(RuntimeError,
284                          self.loop.run_until_complete, task)
285
286    def test_call_later(self):
287        results = []
288
289        def callback(arg):
290            results.append(arg)
291            self.loop.stop()
292
293        self.loop.call_later(0.1, callback, 'hello world')
294        t0 = time.monotonic()
295        self.loop.run_forever()
296        t1 = time.monotonic()
297        self.assertEqual(results, ['hello world'])
298        self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
299
300    def test_call_soon(self):
301        results = []
302
303        def callback(arg1, arg2):
304            results.append((arg1, arg2))
305            self.loop.stop()
306
307        self.loop.call_soon(callback, 'hello', 'world')
308        self.loop.run_forever()
309        self.assertEqual(results, [('hello', 'world')])
310
311    def test_call_soon_threadsafe(self):
312        results = []
313        lock = threading.Lock()
314
315        def callback(arg):
316            results.append(arg)
317            if len(results) >= 2:
318                self.loop.stop()
319
320        def run_in_thread():
321            self.loop.call_soon_threadsafe(callback, 'hello')
322            lock.release()
323
324        lock.acquire()
325        t = threading.Thread(target=run_in_thread)
326        t.start()
327
328        with lock:
329            self.loop.call_soon(callback, 'world')
330            self.loop.run_forever()
331        t.join()
332        self.assertEqual(results, ['hello', 'world'])
333
334    def test_call_soon_threadsafe_same_thread(self):
335        results = []
336
337        def callback(arg):
338            results.append(arg)
339            if len(results) >= 2:
340                self.loop.stop()
341
342        self.loop.call_soon_threadsafe(callback, 'hello')
343        self.loop.call_soon(callback, 'world')
344        self.loop.run_forever()
345        self.assertEqual(results, ['hello', 'world'])
346
347    def test_run_in_executor(self):
348        def run(arg):
349            return (arg, threading.get_ident())
350        f2 = self.loop.run_in_executor(None, run, 'yo')
351        res, thread_id = self.loop.run_until_complete(f2)
352        self.assertEqual(res, 'yo')
353        self.assertNotEqual(thread_id, threading.get_ident())
354
355    def test_run_in_executor_cancel(self):
356        called = False
357
358        def patched_call_soon(*args):
359            nonlocal called
360            called = True
361
362        def run():
363            time.sleep(0.05)
364
365        f2 = self.loop.run_in_executor(None, run)
366        f2.cancel()
367        self.loop.close()
368        self.loop.call_soon = patched_call_soon
369        self.loop.call_soon_threadsafe = patched_call_soon
370        time.sleep(0.4)
371        self.assertFalse(called)
372
373    def test_reader_callback(self):
374        r, w = socket.socketpair()
375        r.setblocking(False)
376        bytes_read = bytearray()
377
378        def reader():
379            try:
380                data = r.recv(1024)
381            except BlockingIOError:
382                # Spurious readiness notifications are possible
383                # at least on Linux -- see man select.
384                return
385            if data:
386                bytes_read.extend(data)
387            else:
388                self.assertTrue(self.loop.remove_reader(r.fileno()))
389                r.close()
390
391        self.loop.add_reader(r.fileno(), reader)
392        self.loop.call_soon(w.send, b'abc')
393        test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3)
394        self.loop.call_soon(w.send, b'def')
395        test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6)
396        self.loop.call_soon(w.close)
397        self.loop.call_soon(self.loop.stop)
398        self.loop.run_forever()
399        self.assertEqual(bytes_read, b'abcdef')
400
401    def test_writer_callback(self):
402        r, w = socket.socketpair()
403        w.setblocking(False)
404
405        def writer(data):
406            w.send(data)
407            self.loop.stop()
408
409        data = b'x' * 1024
410        self.loop.add_writer(w.fileno(), writer, data)
411        self.loop.run_forever()
412
413        self.assertTrue(self.loop.remove_writer(w.fileno()))
414        self.assertFalse(self.loop.remove_writer(w.fileno()))
415
416        w.close()
417        read = r.recv(len(data) * 2)
418        r.close()
419        self.assertEqual(read, data)
420
421    @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL')
422    def test_add_signal_handler(self):
423        caught = 0
424
425        def my_handler():
426            nonlocal caught
427            caught += 1
428
429        # Check error behavior first.
430        self.assertRaises(
431            TypeError, self.loop.add_signal_handler, 'boom', my_handler)
432        self.assertRaises(
433            TypeError, self.loop.remove_signal_handler, 'boom')
434        self.assertRaises(
435            ValueError, self.loop.add_signal_handler, signal.NSIG+1,
436            my_handler)
437        self.assertRaises(
438            ValueError, self.loop.remove_signal_handler, signal.NSIG+1)
439        self.assertRaises(
440            ValueError, self.loop.add_signal_handler, 0, my_handler)
441        self.assertRaises(
442            ValueError, self.loop.remove_signal_handler, 0)
443        self.assertRaises(
444            ValueError, self.loop.add_signal_handler, -1, my_handler)
445        self.assertRaises(
446            ValueError, self.loop.remove_signal_handler, -1)
447        self.assertRaises(
448            RuntimeError, self.loop.add_signal_handler, signal.SIGKILL,
449            my_handler)
450        # Removing SIGKILL doesn't raise, since we don't call signal().
451        self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL))
452        # Now set a handler and handle it.
453        self.loop.add_signal_handler(signal.SIGINT, my_handler)
454
455        os.kill(os.getpid(), signal.SIGINT)
456        test_utils.run_until(self.loop, lambda: caught)
457
458        # Removing it should restore the default handler.
459        self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT))
460        self.assertEqual(signal.getsignal(signal.SIGINT),
461                         signal.default_int_handler)
462        # Removing again returns False.
463        self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT))
464
465    @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
466    def test_signal_handling_while_selecting(self):
467        # Test with a signal actually arriving during a select() call.
468        caught = 0
469
470        def my_handler():
471            nonlocal caught
472            caught += 1
473            self.loop.stop()
474
475        self.loop.add_signal_handler(signal.SIGALRM, my_handler)
476
477        signal.setitimer(signal.ITIMER_REAL, 0.01, 0)  # Send SIGALRM once.
478        self.loop.call_later(60, self.loop.stop)
479        self.loop.run_forever()
480        self.assertEqual(caught, 1)
481
482    @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
483    def test_signal_handling_args(self):
484        some_args = (42,)
485        caught = 0
486
487        def my_handler(*args):
488            nonlocal caught
489            caught += 1
490            self.assertEqual(args, some_args)
491            self.loop.stop()
492
493        self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args)
494
495        signal.setitimer(signal.ITIMER_REAL, 0.1, 0)  # Send SIGALRM once.
496        self.loop.call_later(60, self.loop.stop)
497        self.loop.run_forever()
498        self.assertEqual(caught, 1)
499
500    def _basetest_create_connection(self, connection_fut, check_sockname=True):
501        tr, pr = self.loop.run_until_complete(connection_fut)
502        self.assertIsInstance(tr, asyncio.Transport)
503        self.assertIsInstance(pr, asyncio.Protocol)
504        self.assertIs(pr.transport, tr)
505        if check_sockname:
506            self.assertIsNotNone(tr.get_extra_info('sockname'))
507        self.loop.run_until_complete(pr.done)
508        self.assertGreater(pr.nbytes, 0)
509        tr.close()
510
511    def test_create_connection(self):
512        with test_utils.run_test_server() as httpd:
513            conn_fut = self.loop.create_connection(
514                lambda: MyProto(loop=self.loop), *httpd.address)
515            self._basetest_create_connection(conn_fut)
516
517    @support.skip_unless_bind_unix_socket
518    def test_create_unix_connection(self):
519        # Issue #20682: On Mac OS X Tiger, getsockname() returns a
520        # zero-length address for UNIX socket.
521        check_sockname = not broken_unix_getsockname()
522
523        with test_utils.run_test_unix_server() as httpd:
524            conn_fut = self.loop.create_unix_connection(
525                lambda: MyProto(loop=self.loop), httpd.address)
526            self._basetest_create_connection(conn_fut, check_sockname)
527
528    def check_ssl_extra_info(self, client, check_sockname=True,
529                             peername=None, peercert={}):
530        if check_sockname:
531            self.assertIsNotNone(client.get_extra_info('sockname'))
532        if peername:
533            self.assertEqual(peername,
534                             client.get_extra_info('peername'))
535        else:
536            self.assertIsNotNone(client.get_extra_info('peername'))
537        self.assertEqual(peercert,
538                         client.get_extra_info('peercert'))
539
540        # test SSL cipher
541        cipher = client.get_extra_info('cipher')
542        self.assertIsInstance(cipher, tuple)
543        self.assertEqual(len(cipher), 3, cipher)
544        self.assertIsInstance(cipher[0], str)
545        self.assertIsInstance(cipher[1], str)
546        self.assertIsInstance(cipher[2], int)
547
548        # test SSL object
549        sslobj = client.get_extra_info('ssl_object')
550        self.assertIsNotNone(sslobj)
551        self.assertEqual(sslobj.compression(),
552                         client.get_extra_info('compression'))
553        self.assertEqual(sslobj.cipher(),
554                         client.get_extra_info('cipher'))
555        self.assertEqual(sslobj.getpeercert(),
556                         client.get_extra_info('peercert'))
557        self.assertEqual(sslobj.compression(),
558                         client.get_extra_info('compression'))
559
560    def _basetest_create_ssl_connection(self, connection_fut,
561                                        check_sockname=True,
562                                        peername=None):
563        tr, pr = self.loop.run_until_complete(connection_fut)
564        self.assertIsInstance(tr, asyncio.Transport)
565        self.assertIsInstance(pr, asyncio.Protocol)
566        self.assertTrue('ssl' in tr.__class__.__name__.lower())
567        self.check_ssl_extra_info(tr, check_sockname, peername)
568        self.loop.run_until_complete(pr.done)
569        self.assertGreater(pr.nbytes, 0)
570        tr.close()
571
572    def _test_create_ssl_connection(self, httpd, create_connection,
573                                    check_sockname=True, peername=None):
574        conn_fut = create_connection(ssl=test_utils.dummy_ssl_context())
575        self._basetest_create_ssl_connection(conn_fut, check_sockname,
576                                             peername)
577
578        # ssl.Purpose was introduced in Python 3.4
579        if hasattr(ssl, 'Purpose'):
580            def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *,
581                                          cafile=None, capath=None,
582                                          cadata=None):
583                """
584                A ssl.create_default_context() replacement that doesn't enable
585                cert validation.
586                """
587                self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH)
588                return test_utils.dummy_ssl_context()
589
590            # With ssl=True, ssl.create_default_context() should be called
591            with mock.patch('ssl.create_default_context',
592                            side_effect=_dummy_ssl_create_context) as m:
593                conn_fut = create_connection(ssl=True)
594                self._basetest_create_ssl_connection(conn_fut, check_sockname,
595                                                     peername)
596                self.assertEqual(m.call_count, 1)
597
598        # With the real ssl.create_default_context(), certificate
599        # validation will fail
600        with self.assertRaises(ssl.SSLError) as cm:
601            conn_fut = create_connection(ssl=True)
602            # Ignore the "SSL handshake failed" log in debug mode
603            with test_utils.disable_logger():
604                self._basetest_create_ssl_connection(conn_fut, check_sockname,
605                                                     peername)
606
607        self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
608
609    @unittest.skipIf(ssl is None, 'No ssl module')
610    def test_create_ssl_connection(self):
611        with test_utils.run_test_server(use_ssl=True) as httpd:
612            create_connection = functools.partial(
613                self.loop.create_connection,
614                lambda: MyProto(loop=self.loop),
615                *httpd.address)
616            self._test_create_ssl_connection(httpd, create_connection,
617                                             peername=httpd.address)
618
619    @support.skip_unless_bind_unix_socket
620    @unittest.skipIf(ssl is None, 'No ssl module')
621    def test_create_ssl_unix_connection(self):
622        # Issue #20682: On Mac OS X Tiger, getsockname() returns a
623        # zero-length address for UNIX socket.
624        check_sockname = not broken_unix_getsockname()
625
626        with test_utils.run_test_unix_server(use_ssl=True) as httpd:
627            create_connection = functools.partial(
628                self.loop.create_unix_connection,
629                lambda: MyProto(loop=self.loop), httpd.address,
630                server_hostname='127.0.0.1')
631
632            self._test_create_ssl_connection(httpd, create_connection,
633                                             check_sockname,
634                                             peername=httpd.address)
635
636    def test_create_connection_local_addr(self):
637        with test_utils.run_test_server() as httpd:
638            port = support.find_unused_port()
639            f = self.loop.create_connection(
640                lambda: MyProto(loop=self.loop),
641                *httpd.address, local_addr=(httpd.address[0], port))
642            tr, pr = self.loop.run_until_complete(f)
643            expected = pr.transport.get_extra_info('sockname')[1]
644            self.assertEqual(port, expected)
645            tr.close()
646
647    def test_create_connection_local_addr_in_use(self):
648        with test_utils.run_test_server() as httpd:
649            f = self.loop.create_connection(
650                lambda: MyProto(loop=self.loop),
651                *httpd.address, local_addr=httpd.address)
652            with self.assertRaises(OSError) as cm:
653                self.loop.run_until_complete(f)
654            self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
655            self.assertIn(str(httpd.address), cm.exception.strerror)
656
657    def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None):
658        loop = self.loop
659
660        class MyProto(MyBaseProto):
661
662            def connection_lost(self, exc):
663                super().connection_lost(exc)
664                loop.call_soon(loop.stop)
665
666            def data_received(self, data):
667                super().data_received(data)
668                self.transport.write(expected_response)
669
670        lsock = socket.create_server(('127.0.0.1', 0), backlog=1)
671        addr = lsock.getsockname()
672
673        message = b'test data'
674        response = None
675        expected_response = b'roger'
676
677        def client():
678            nonlocal response
679            try:
680                csock = socket.socket()
681                if client_ssl is not None:
682                    csock = client_ssl.wrap_socket(csock)
683                csock.connect(addr)
684                csock.sendall(message)
685                response = csock.recv(99)
686                csock.close()
687            except Exception as exc:
688                print(
689                    "Failure in client thread in test_connect_accepted_socket",
690                    exc)
691
692        thread = threading.Thread(target=client, daemon=True)
693        thread.start()
694
695        conn, _ = lsock.accept()
696        proto = MyProto(loop=loop)
697        proto.loop = loop
698        loop.run_until_complete(
699            loop.connect_accepted_socket(
700                (lambda: proto), conn, ssl=server_ssl))
701        loop.run_forever()
702        proto.transport.close()
703        lsock.close()
704
705        support.join_thread(thread, timeout=1)
706        self.assertFalse(thread.is_alive())
707        self.assertEqual(proto.state, 'CLOSED')
708        self.assertEqual(proto.nbytes, len(message))
709        self.assertEqual(response, expected_response)
710
711    @unittest.skipIf(ssl is None, 'No ssl module')
712    def test_ssl_connect_accepted_socket(self):
713        if (sys.platform == 'win32' and
714            sys.version_info < (3, 5) and
715            isinstance(self.loop, proactor_events.BaseProactorEventLoop)
716            ):
717            raise unittest.SkipTest(
718                'SSL not supported with proactor event loops before Python 3.5'
719                )
720
721        server_context = test_utils.simple_server_sslcontext()
722        client_context = test_utils.simple_client_sslcontext()
723
724        self.test_connect_accepted_socket(server_context, client_context)
725
726    def test_connect_accepted_socket_ssl_timeout_for_plain_socket(self):
727        sock = socket.socket()
728        self.addCleanup(sock.close)
729        coro = self.loop.connect_accepted_socket(
730            MyProto, sock, ssl_handshake_timeout=1)
731        with self.assertRaisesRegex(
732                ValueError,
733                'ssl_handshake_timeout is only meaningful with ssl'):
734            self.loop.run_until_complete(coro)
735
736    @mock.patch('asyncio.base_events.socket')
737    def create_server_multiple_hosts(self, family, hosts, mock_sock):
738        async def getaddrinfo(host, port, *args, **kw):
739            if family == socket.AF_INET:
740                return [(family, socket.SOCK_STREAM, 6, '', (host, port))]
741            else:
742                return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))]
743
744        def getaddrinfo_task(*args, **kwds):
745            return self.loop.create_task(getaddrinfo(*args, **kwds))
746
747        unique_hosts = set(hosts)
748
749        if family == socket.AF_INET:
750            mock_sock.socket().getsockbyname.side_effect = [
751                (host, 80) for host in unique_hosts]
752        else:
753            mock_sock.socket().getsockbyname.side_effect = [
754                (host, 80, 0, 0) for host in unique_hosts]
755        self.loop.getaddrinfo = getaddrinfo_task
756        self.loop._start_serving = mock.Mock()
757        self.loop._stop_serving = mock.Mock()
758        f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80)
759        server = self.loop.run_until_complete(f)
760        self.addCleanup(server.close)
761        server_hosts = {sock.getsockbyname()[0] for sock in server.sockets}
762        self.assertEqual(server_hosts, unique_hosts)
763
764    def test_create_server_multiple_hosts_ipv4(self):
765        self.create_server_multiple_hosts(socket.AF_INET,
766                                          ['1.2.3.4', '5.6.7.8', '1.2.3.4'])
767
768    def test_create_server_multiple_hosts_ipv6(self):
769        self.create_server_multiple_hosts(socket.AF_INET6,
770                                          ['::1', '::2', '::1'])
771
772    def test_create_server(self):
773        proto = MyProto(self.loop)
774        f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
775        server = self.loop.run_until_complete(f)
776        self.assertEqual(len(server.sockets), 1)
777        sock = server.sockets[0]
778        host, port = sock.getsockname()
779        self.assertEqual(host, '0.0.0.0')
780        client = socket.socket()
781        client.connect(('127.0.0.1', port))
782        client.sendall(b'xxx')
783
784        self.loop.run_until_complete(proto.connected)
785        self.assertEqual('CONNECTED', proto.state)
786
787        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
788        self.assertEqual(3, proto.nbytes)
789
790        # extra info is available
791        self.assertIsNotNone(proto.transport.get_extra_info('sockname'))
792        self.assertEqual('127.0.0.1',
793                         proto.transport.get_extra_info('peername')[0])
794
795        # close connection
796        proto.transport.close()
797        self.loop.run_until_complete(proto.done)
798
799        self.assertEqual('CLOSED', proto.state)
800
801        # the client socket must be closed after to avoid ECONNRESET upon
802        # recv()/send() on the serving socket
803        client.close()
804
805        # close server
806        server.close()
807
808    @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT')
809    def test_create_server_reuse_port(self):
810        proto = MyProto(self.loop)
811        f = self.loop.create_server(
812            lambda: proto, '0.0.0.0', 0)
813        server = self.loop.run_until_complete(f)
814        self.assertEqual(len(server.sockets), 1)
815        sock = server.sockets[0]
816        self.assertFalse(
817            sock.getsockopt(
818                socket.SOL_SOCKET, socket.SO_REUSEPORT))
819        server.close()
820
821        test_utils.run_briefly(self.loop)
822
823        proto = MyProto(self.loop)
824        f = self.loop.create_server(
825            lambda: proto, '0.0.0.0', 0, reuse_port=True)
826        server = self.loop.run_until_complete(f)
827        self.assertEqual(len(server.sockets), 1)
828        sock = server.sockets[0]
829        self.assertTrue(
830            sock.getsockopt(
831                socket.SOL_SOCKET, socket.SO_REUSEPORT))
832        server.close()
833
834    def _make_unix_server(self, factory, **kwargs):
835        path = test_utils.gen_unix_socket_path()
836        self.addCleanup(lambda: os.path.exists(path) and os.unlink(path))
837
838        f = self.loop.create_unix_server(factory, path, **kwargs)
839        server = self.loop.run_until_complete(f)
840
841        return server, path
842
843    @support.skip_unless_bind_unix_socket
844    def test_create_unix_server(self):
845        proto = MyProto(loop=self.loop)
846        server, path = self._make_unix_server(lambda: proto)
847        self.assertEqual(len(server.sockets), 1)
848
849        client = socket.socket(socket.AF_UNIX)
850        client.connect(path)
851        client.sendall(b'xxx')
852
853        self.loop.run_until_complete(proto.connected)
854        self.assertEqual('CONNECTED', proto.state)
855        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
856        self.assertEqual(3, proto.nbytes)
857
858        # close connection
859        proto.transport.close()
860        self.loop.run_until_complete(proto.done)
861
862        self.assertEqual('CLOSED', proto.state)
863
864        # the client socket must be closed after to avoid ECONNRESET upon
865        # recv()/send() on the serving socket
866        client.close()
867
868        # close server
869        server.close()
870
871    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
872    def test_create_unix_server_path_socket_error(self):
873        proto = MyProto(loop=self.loop)
874        sock = socket.socket()
875        with sock:
876            f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock)
877            with self.assertRaisesRegex(ValueError,
878                                        'path and sock can not be specified '
879                                        'at the same time'):
880                self.loop.run_until_complete(f)
881
882    def _create_ssl_context(self, certfile, keyfile=None):
883        sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
884        sslcontext.options |= ssl.OP_NO_SSLv2
885        sslcontext.load_cert_chain(certfile, keyfile)
886        return sslcontext
887
888    def _make_ssl_server(self, factory, certfile, keyfile=None):
889        sslcontext = self._create_ssl_context(certfile, keyfile)
890
891        f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext)
892        server = self.loop.run_until_complete(f)
893
894        sock = server.sockets[0]
895        host, port = sock.getsockname()
896        self.assertEqual(host, '127.0.0.1')
897        return server, host, port
898
899    def _make_ssl_unix_server(self, factory, certfile, keyfile=None):
900        sslcontext = self._create_ssl_context(certfile, keyfile)
901        return self._make_unix_server(factory, ssl=sslcontext)
902
903    @unittest.skipIf(ssl is None, 'No ssl module')
904    def test_create_server_ssl(self):
905        proto = MyProto(loop=self.loop)
906        server, host, port = self._make_ssl_server(
907            lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY)
908
909        f_c = self.loop.create_connection(MyBaseProto, host, port,
910                                          ssl=test_utils.dummy_ssl_context())
911        client, pr = self.loop.run_until_complete(f_c)
912
913        client.write(b'xxx')
914        self.loop.run_until_complete(proto.connected)
915        self.assertEqual('CONNECTED', proto.state)
916
917        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
918        self.assertEqual(3, proto.nbytes)
919
920        # extra info is available
921        self.check_ssl_extra_info(client, peername=(host, port))
922
923        # close connection
924        proto.transport.close()
925        self.loop.run_until_complete(proto.done)
926        self.assertEqual('CLOSED', proto.state)
927
928        # the client socket must be closed after to avoid ECONNRESET upon
929        # recv()/send() on the serving socket
930        client.close()
931
932        # stop serving
933        server.close()
934
935    @support.skip_unless_bind_unix_socket
936    @unittest.skipIf(ssl is None, 'No ssl module')
937    def test_create_unix_server_ssl(self):
938        proto = MyProto(loop=self.loop)
939        server, path = self._make_ssl_unix_server(
940            lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY)
941
942        f_c = self.loop.create_unix_connection(
943            MyBaseProto, path, ssl=test_utils.dummy_ssl_context(),
944            server_hostname='')
945
946        client, pr = self.loop.run_until_complete(f_c)
947
948        client.write(b'xxx')
949        self.loop.run_until_complete(proto.connected)
950        self.assertEqual('CONNECTED', proto.state)
951        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
952        self.assertEqual(3, proto.nbytes)
953
954        # close connection
955        proto.transport.close()
956        self.loop.run_until_complete(proto.done)
957        self.assertEqual('CLOSED', proto.state)
958
959        # the client socket must be closed after to avoid ECONNRESET upon
960        # recv()/send() on the serving socket
961        client.close()
962
963        # stop serving
964        server.close()
965
966    @unittest.skipIf(ssl is None, 'No ssl module')
967    def test_create_server_ssl_verify_failed(self):
968        proto = MyProto(loop=self.loop)
969        server, host, port = self._make_ssl_server(
970            lambda: proto, test_utils.SIGNED_CERTFILE)
971
972        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
973        sslcontext_client.options |= ssl.OP_NO_SSLv2
974        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
975        if hasattr(sslcontext_client, 'check_hostname'):
976            sslcontext_client.check_hostname = True
977
978
979        # no CA loaded
980        f_c = self.loop.create_connection(MyProto, host, port,
981                                          ssl=sslcontext_client)
982        with mock.patch.object(self.loop, 'call_exception_handler'):
983            with test_utils.disable_logger():
984                with self.assertRaisesRegex(ssl.SSLError,
985                                            '(?i)certificate.verify.failed'):
986                    self.loop.run_until_complete(f_c)
987
988            # execute the loop to log the connection error
989            test_utils.run_briefly(self.loop)
990
991        # close connection
992        self.assertIsNone(proto.transport)
993        server.close()
994
995    @support.skip_unless_bind_unix_socket
996    @unittest.skipIf(ssl is None, 'No ssl module')
997    def test_create_unix_server_ssl_verify_failed(self):
998        proto = MyProto(loop=self.loop)
999        server, path = self._make_ssl_unix_server(
1000            lambda: proto, test_utils.SIGNED_CERTFILE)
1001
1002        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1003        sslcontext_client.options |= ssl.OP_NO_SSLv2
1004        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1005        if hasattr(sslcontext_client, 'check_hostname'):
1006            sslcontext_client.check_hostname = True
1007
1008        # no CA loaded
1009        f_c = self.loop.create_unix_connection(MyProto, path,
1010                                               ssl=sslcontext_client,
1011                                               server_hostname='invalid')
1012        with mock.patch.object(self.loop, 'call_exception_handler'):
1013            with test_utils.disable_logger():
1014                with self.assertRaisesRegex(ssl.SSLError,
1015                                            '(?i)certificate.verify.failed'):
1016                    self.loop.run_until_complete(f_c)
1017
1018            # execute the loop to log the connection error
1019            test_utils.run_briefly(self.loop)
1020
1021        # close connection
1022        self.assertIsNone(proto.transport)
1023        server.close()
1024
1025    @unittest.skipIf(ssl is None, 'No ssl module')
1026    def test_create_server_ssl_match_failed(self):
1027        proto = MyProto(loop=self.loop)
1028        server, host, port = self._make_ssl_server(
1029            lambda: proto, test_utils.SIGNED_CERTFILE)
1030
1031        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1032        sslcontext_client.options |= ssl.OP_NO_SSLv2
1033        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1034        sslcontext_client.load_verify_locations(
1035            cafile=test_utils.SIGNING_CA)
1036        if hasattr(sslcontext_client, 'check_hostname'):
1037            sslcontext_client.check_hostname = True
1038
1039        # incorrect server_hostname
1040        f_c = self.loop.create_connection(MyProto, host, port,
1041                                          ssl=sslcontext_client)
1042        with mock.patch.object(self.loop, 'call_exception_handler'):
1043            with test_utils.disable_logger():
1044                with self.assertRaisesRegex(
1045                        ssl.CertificateError,
1046                        "IP address mismatch, certificate is not valid for "
1047                        "'127.0.0.1'"):
1048                    self.loop.run_until_complete(f_c)
1049
1050        # close connection
1051        # transport is None because TLS ALERT aborted the handshake
1052        self.assertIsNone(proto.transport)
1053        server.close()
1054
1055    @support.skip_unless_bind_unix_socket
1056    @unittest.skipIf(ssl is None, 'No ssl module')
1057    def test_create_unix_server_ssl_verified(self):
1058        proto = MyProto(loop=self.loop)
1059        server, path = self._make_ssl_unix_server(
1060            lambda: proto, test_utils.SIGNED_CERTFILE)
1061
1062        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1063        sslcontext_client.options |= ssl.OP_NO_SSLv2
1064        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1065        sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA)
1066        if hasattr(sslcontext_client, 'check_hostname'):
1067            sslcontext_client.check_hostname = True
1068
1069        # Connection succeeds with correct CA and server hostname.
1070        f_c = self.loop.create_unix_connection(MyProto, path,
1071                                               ssl=sslcontext_client,
1072                                               server_hostname='localhost')
1073        client, pr = self.loop.run_until_complete(f_c)
1074
1075        # close connection
1076        proto.transport.close()
1077        client.close()
1078        server.close()
1079        self.loop.run_until_complete(proto.done)
1080
1081    @unittest.skipIf(ssl is None, 'No ssl module')
1082    def test_create_server_ssl_verified(self):
1083        proto = MyProto(loop=self.loop)
1084        server, host, port = self._make_ssl_server(
1085            lambda: proto, test_utils.SIGNED_CERTFILE)
1086
1087        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1088        sslcontext_client.options |= ssl.OP_NO_SSLv2
1089        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1090        sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA)
1091        if hasattr(sslcontext_client, 'check_hostname'):
1092            sslcontext_client.check_hostname = True
1093
1094        # Connection succeeds with correct CA and server hostname.
1095        f_c = self.loop.create_connection(MyProto, host, port,
1096                                          ssl=sslcontext_client,
1097                                          server_hostname='localhost')
1098        client, pr = self.loop.run_until_complete(f_c)
1099
1100        # extra info is available
1101        self.check_ssl_extra_info(client, peername=(host, port),
1102                                  peercert=test_utils.PEERCERT)
1103
1104        # close connection
1105        proto.transport.close()
1106        client.close()
1107        server.close()
1108        self.loop.run_until_complete(proto.done)
1109
1110    def test_create_server_sock(self):
1111        proto = self.loop.create_future()
1112
1113        class TestMyProto(MyProto):
1114            def connection_made(self, transport):
1115                super().connection_made(transport)
1116                proto.set_result(self)
1117
1118        sock_ob = socket.create_server(('0.0.0.0', 0))
1119
1120        f = self.loop.create_server(TestMyProto, sock=sock_ob)
1121        server = self.loop.run_until_complete(f)
1122        sock = server.sockets[0]
1123        self.assertEqual(sock.fileno(), sock_ob.fileno())
1124
1125        host, port = sock.getsockname()
1126        self.assertEqual(host, '0.0.0.0')
1127        client = socket.socket()
1128        client.connect(('127.0.0.1', port))
1129        client.send(b'xxx')
1130        client.close()
1131        server.close()
1132
1133    def test_create_server_addr_in_use(self):
1134        sock_ob = socket.create_server(('0.0.0.0', 0))
1135
1136        f = self.loop.create_server(MyProto, sock=sock_ob)
1137        server = self.loop.run_until_complete(f)
1138        sock = server.sockets[0]
1139        host, port = sock.getsockname()
1140
1141        f = self.loop.create_server(MyProto, host=host, port=port)
1142        with self.assertRaises(OSError) as cm:
1143            self.loop.run_until_complete(f)
1144        self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
1145
1146        server.close()
1147
1148    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
1149    def test_create_server_dual_stack(self):
1150        f_proto = self.loop.create_future()
1151
1152        class TestMyProto(MyProto):
1153            def connection_made(self, transport):
1154                super().connection_made(transport)
1155                f_proto.set_result(self)
1156
1157        try_count = 0
1158        while True:
1159            try:
1160                port = support.find_unused_port()
1161                f = self.loop.create_server(TestMyProto, host=None, port=port)
1162                server = self.loop.run_until_complete(f)
1163            except OSError as ex:
1164                if ex.errno == errno.EADDRINUSE:
1165                    try_count += 1
1166                    self.assertGreaterEqual(5, try_count)
1167                    continue
1168                else:
1169                    raise
1170            else:
1171                break
1172        client = socket.socket()
1173        client.connect(('127.0.0.1', port))
1174        client.send(b'xxx')
1175        proto = self.loop.run_until_complete(f_proto)
1176        proto.transport.close()
1177        client.close()
1178
1179        f_proto = self.loop.create_future()
1180        client = socket.socket(socket.AF_INET6)
1181        client.connect(('::1', port))
1182        client.send(b'xxx')
1183        proto = self.loop.run_until_complete(f_proto)
1184        proto.transport.close()
1185        client.close()
1186
1187        server.close()
1188
1189    def test_server_close(self):
1190        f = self.loop.create_server(MyProto, '0.0.0.0', 0)
1191        server = self.loop.run_until_complete(f)
1192        sock = server.sockets[0]
1193        host, port = sock.getsockname()
1194
1195        client = socket.socket()
1196        client.connect(('127.0.0.1', port))
1197        client.send(b'xxx')
1198        client.close()
1199
1200        server.close()
1201
1202        client = socket.socket()
1203        self.assertRaises(
1204            ConnectionRefusedError, client.connect, ('127.0.0.1', port))
1205        client.close()
1206
1207    def _test_create_datagram_endpoint(self, local_addr, family):
1208        class TestMyDatagramProto(MyDatagramProto):
1209            def __init__(inner_self):
1210                super().__init__(loop=self.loop)
1211
1212            def datagram_received(self, data, addr):
1213                super().datagram_received(data, addr)
1214                self.transport.sendto(b'resp:'+data, addr)
1215
1216        coro = self.loop.create_datagram_endpoint(
1217            TestMyDatagramProto, local_addr=local_addr, family=family)
1218        s_transport, server = self.loop.run_until_complete(coro)
1219        sockname = s_transport.get_extra_info('sockname')
1220        host, port = socket.getnameinfo(
1221            sockname, socket.NI_NUMERICHOST|socket.NI_NUMERICSERV)
1222
1223        self.assertIsInstance(s_transport, asyncio.Transport)
1224        self.assertIsInstance(server, TestMyDatagramProto)
1225        self.assertEqual('INITIALIZED', server.state)
1226        self.assertIs(server.transport, s_transport)
1227
1228        coro = self.loop.create_datagram_endpoint(
1229            lambda: MyDatagramProto(loop=self.loop),
1230            remote_addr=(host, port))
1231        transport, client = self.loop.run_until_complete(coro)
1232
1233        self.assertIsInstance(transport, asyncio.Transport)
1234        self.assertIsInstance(client, MyDatagramProto)
1235        self.assertEqual('INITIALIZED', client.state)
1236        self.assertIs(client.transport, transport)
1237
1238        transport.sendto(b'xxx')
1239        test_utils.run_until(self.loop, lambda: server.nbytes)
1240        self.assertEqual(3, server.nbytes)
1241        test_utils.run_until(self.loop, lambda: client.nbytes)
1242
1243        # received
1244        self.assertEqual(8, client.nbytes)
1245
1246        # extra info is available
1247        self.assertIsNotNone(transport.get_extra_info('sockname'))
1248
1249        # close connection
1250        transport.close()
1251        self.loop.run_until_complete(client.done)
1252        self.assertEqual('CLOSED', client.state)
1253        server.transport.close()
1254
1255    def test_create_datagram_endpoint(self):
1256        self._test_create_datagram_endpoint(('127.0.0.1', 0), socket.AF_INET)
1257
1258    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
1259    def test_create_datagram_endpoint_ipv6(self):
1260        self._test_create_datagram_endpoint(('::1', 0), socket.AF_INET6)
1261
1262    def test_create_datagram_endpoint_sock(self):
1263        sock = None
1264        local_address = ('127.0.0.1', 0)
1265        infos = self.loop.run_until_complete(
1266            self.loop.getaddrinfo(
1267                *local_address, type=socket.SOCK_DGRAM))
1268        for family, type, proto, cname, address in infos:
1269            try:
1270                sock = socket.socket(family=family, type=type, proto=proto)
1271                sock.setblocking(False)
1272                sock.bind(address)
1273            except:
1274                pass
1275            else:
1276                break
1277        else:
1278            assert False, 'Can not create socket.'
1279
1280        f = self.loop.create_datagram_endpoint(
1281            lambda: MyDatagramProto(loop=self.loop), sock=sock)
1282        tr, pr = self.loop.run_until_complete(f)
1283        self.assertIsInstance(tr, asyncio.Transport)
1284        self.assertIsInstance(pr, MyDatagramProto)
1285        tr.close()
1286        self.loop.run_until_complete(pr.done)
1287
1288    def test_internal_fds(self):
1289        loop = self.create_event_loop()
1290        if not isinstance(loop, selector_events.BaseSelectorEventLoop):
1291            loop.close()
1292            self.skipTest('loop is not a BaseSelectorEventLoop')
1293
1294        self.assertEqual(1, loop._internal_fds)
1295        loop.close()
1296        self.assertEqual(0, loop._internal_fds)
1297        self.assertIsNone(loop._csock)
1298        self.assertIsNone(loop._ssock)
1299
1300    @unittest.skipUnless(sys.platform != 'win32',
1301                         "Don't support pipes for Windows")
1302    def test_read_pipe(self):
1303        proto = MyReadPipeProto(loop=self.loop)
1304
1305        rpipe, wpipe = os.pipe()
1306        pipeobj = io.open(rpipe, 'rb', 1024)
1307
1308        async def connect():
1309            t, p = await self.loop.connect_read_pipe(
1310                lambda: proto, pipeobj)
1311            self.assertIs(p, proto)
1312            self.assertIs(t, proto.transport)
1313            self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1314            self.assertEqual(0, proto.nbytes)
1315
1316        self.loop.run_until_complete(connect())
1317
1318        os.write(wpipe, b'1')
1319        test_utils.run_until(self.loop, lambda: proto.nbytes >= 1)
1320        self.assertEqual(1, proto.nbytes)
1321
1322        os.write(wpipe, b'2345')
1323        test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
1324        self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1325        self.assertEqual(5, proto.nbytes)
1326
1327        os.close(wpipe)
1328        self.loop.run_until_complete(proto.done)
1329        self.assertEqual(
1330            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
1331        # extra info is available
1332        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1333
1334    @unittest.skipUnless(sys.platform != 'win32',
1335                         "Don't support pipes for Windows")
1336    def test_unclosed_pipe_transport(self):
1337        # This test reproduces the issue #314 on GitHub
1338        loop = self.create_event_loop()
1339        read_proto = MyReadPipeProto(loop=loop)
1340        write_proto = MyWritePipeProto(loop=loop)
1341
1342        rpipe, wpipe = os.pipe()
1343        rpipeobj = io.open(rpipe, 'rb', 1024)
1344        wpipeobj = io.open(wpipe, 'w', 1024)
1345
1346        async def connect():
1347            read_transport, _ = await loop.connect_read_pipe(
1348                lambda: read_proto, rpipeobj)
1349            write_transport, _ = await loop.connect_write_pipe(
1350                lambda: write_proto, wpipeobj)
1351            return read_transport, write_transport
1352
1353        # Run and close the loop without closing the transports
1354        read_transport, write_transport = loop.run_until_complete(connect())
1355        loop.close()
1356
1357        # These 'repr' calls used to raise an AttributeError
1358        # See Issue #314 on GitHub
1359        self.assertIn('open', repr(read_transport))
1360        self.assertIn('open', repr(write_transport))
1361
1362        # Clean up (avoid ResourceWarning)
1363        rpipeobj.close()
1364        wpipeobj.close()
1365        read_transport._pipe = None
1366        write_transport._pipe = None
1367
1368    @unittest.skipUnless(sys.platform != 'win32',
1369                         "Don't support pipes for Windows")
1370    def test_read_pty_output(self):
1371        proto = MyReadPipeProto(loop=self.loop)
1372
1373        master, slave = os.openpty()
1374        master_read_obj = io.open(master, 'rb', 0)
1375
1376        async def connect():
1377            t, p = await self.loop.connect_read_pipe(lambda: proto,
1378                                                     master_read_obj)
1379            self.assertIs(p, proto)
1380            self.assertIs(t, proto.transport)
1381            self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1382            self.assertEqual(0, proto.nbytes)
1383
1384        self.loop.run_until_complete(connect())
1385
1386        os.write(slave, b'1')
1387        test_utils.run_until(self.loop, lambda: proto.nbytes)
1388        self.assertEqual(1, proto.nbytes)
1389
1390        os.write(slave, b'2345')
1391        test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
1392        self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1393        self.assertEqual(5, proto.nbytes)
1394
1395        os.close(slave)
1396        proto.transport.close()
1397        self.loop.run_until_complete(proto.done)
1398        self.assertEqual(
1399            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
1400        # extra info is available
1401        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1402
1403    @unittest.skipUnless(sys.platform != 'win32',
1404                         "Don't support pipes for Windows")
1405    def test_write_pipe(self):
1406        rpipe, wpipe = os.pipe()
1407        pipeobj = io.open(wpipe, 'wb', 1024)
1408
1409        proto = MyWritePipeProto(loop=self.loop)
1410        connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
1411        transport, p = self.loop.run_until_complete(connect)
1412        self.assertIs(p, proto)
1413        self.assertIs(transport, proto.transport)
1414        self.assertEqual('CONNECTED', proto.state)
1415
1416        transport.write(b'1')
1417
1418        data = bytearray()
1419        def reader(data):
1420            chunk = os.read(rpipe, 1024)
1421            data += chunk
1422            return len(data)
1423
1424        test_utils.run_until(self.loop, lambda: reader(data) >= 1)
1425        self.assertEqual(b'1', data)
1426
1427        transport.write(b'2345')
1428        test_utils.run_until(self.loop, lambda: reader(data) >= 5)
1429        self.assertEqual(b'12345', data)
1430        self.assertEqual('CONNECTED', proto.state)
1431
1432        os.close(rpipe)
1433
1434        # extra info is available
1435        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1436
1437        # close connection
1438        proto.transport.close()
1439        self.loop.run_until_complete(proto.done)
1440        self.assertEqual('CLOSED', proto.state)
1441
1442    @unittest.skipUnless(sys.platform != 'win32',
1443                         "Don't support pipes for Windows")
1444    def test_write_pipe_disconnect_on_close(self):
1445        rsock, wsock = socket.socketpair()
1446        rsock.setblocking(False)
1447        pipeobj = io.open(wsock.detach(), 'wb', 1024)
1448
1449        proto = MyWritePipeProto(loop=self.loop)
1450        connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
1451        transport, p = self.loop.run_until_complete(connect)
1452        self.assertIs(p, proto)
1453        self.assertIs(transport, proto.transport)
1454        self.assertEqual('CONNECTED', proto.state)
1455
1456        transport.write(b'1')
1457        data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024))
1458        self.assertEqual(b'1', data)
1459
1460        rsock.close()
1461
1462        self.loop.run_until_complete(proto.done)
1463        self.assertEqual('CLOSED', proto.state)
1464
1465    @unittest.skipUnless(sys.platform != 'win32',
1466                         "Don't support pipes for Windows")
1467    # select, poll and kqueue don't support character devices (PTY) on Mac OS X
1468    # older than 10.6 (Snow Leopard)
1469    @support.requires_mac_ver(10, 6)
1470    def test_write_pty(self):
1471        master, slave = os.openpty()
1472        slave_write_obj = io.open(slave, 'wb', 0)
1473
1474        proto = MyWritePipeProto(loop=self.loop)
1475        connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj)
1476        transport, p = self.loop.run_until_complete(connect)
1477        self.assertIs(p, proto)
1478        self.assertIs(transport, proto.transport)
1479        self.assertEqual('CONNECTED', proto.state)
1480
1481        transport.write(b'1')
1482
1483        data = bytearray()
1484        def reader(data):
1485            chunk = os.read(master, 1024)
1486            data += chunk
1487            return len(data)
1488
1489        test_utils.run_until(self.loop, lambda: reader(data) >= 1,
1490                             timeout=10)
1491        self.assertEqual(b'1', data)
1492
1493        transport.write(b'2345')
1494        test_utils.run_until(self.loop, lambda: reader(data) >= 5,
1495                             timeout=10)
1496        self.assertEqual(b'12345', data)
1497        self.assertEqual('CONNECTED', proto.state)
1498
1499        os.close(master)
1500
1501        # extra info is available
1502        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1503
1504        # close connection
1505        proto.transport.close()
1506        self.loop.run_until_complete(proto.done)
1507        self.assertEqual('CLOSED', proto.state)
1508
1509    @unittest.skipUnless(sys.platform != 'win32',
1510                         "Don't support pipes for Windows")
1511    # select, poll and kqueue don't support character devices (PTY) on Mac OS X
1512    # older than 10.6 (Snow Leopard)
1513    @support.requires_mac_ver(10, 6)
1514    def test_bidirectional_pty(self):
1515        master, read_slave = os.openpty()
1516        write_slave = os.dup(read_slave)
1517        tty.setraw(read_slave)
1518
1519        slave_read_obj = io.open(read_slave, 'rb', 0)
1520        read_proto = MyReadPipeProto(loop=self.loop)
1521        read_connect = self.loop.connect_read_pipe(lambda: read_proto,
1522                                                   slave_read_obj)
1523        read_transport, p = self.loop.run_until_complete(read_connect)
1524        self.assertIs(p, read_proto)
1525        self.assertIs(read_transport, read_proto.transport)
1526        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1527        self.assertEqual(0, read_proto.nbytes)
1528
1529
1530        slave_write_obj = io.open(write_slave, 'wb', 0)
1531        write_proto = MyWritePipeProto(loop=self.loop)
1532        write_connect = self.loop.connect_write_pipe(lambda: write_proto,
1533                                                     slave_write_obj)
1534        write_transport, p = self.loop.run_until_complete(write_connect)
1535        self.assertIs(p, write_proto)
1536        self.assertIs(write_transport, write_proto.transport)
1537        self.assertEqual('CONNECTED', write_proto.state)
1538
1539        data = bytearray()
1540        def reader(data):
1541            chunk = os.read(master, 1024)
1542            data += chunk
1543            return len(data)
1544
1545        write_transport.write(b'1')
1546        test_utils.run_until(self.loop, lambda: reader(data) >= 1, timeout=10)
1547        self.assertEqual(b'1', data)
1548        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1549        self.assertEqual('CONNECTED', write_proto.state)
1550
1551        os.write(master, b'a')
1552        test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1,
1553                             timeout=10)
1554        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1555        self.assertEqual(1, read_proto.nbytes)
1556        self.assertEqual('CONNECTED', write_proto.state)
1557
1558        write_transport.write(b'2345')
1559        test_utils.run_until(self.loop, lambda: reader(data) >= 5, timeout=10)
1560        self.assertEqual(b'12345', data)
1561        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1562        self.assertEqual('CONNECTED', write_proto.state)
1563
1564        os.write(master, b'bcde')
1565        test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5,
1566                             timeout=10)
1567        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1568        self.assertEqual(5, read_proto.nbytes)
1569        self.assertEqual('CONNECTED', write_proto.state)
1570
1571        os.close(master)
1572
1573        read_transport.close()
1574        self.loop.run_until_complete(read_proto.done)
1575        self.assertEqual(
1576            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state)
1577
1578        write_transport.close()
1579        self.loop.run_until_complete(write_proto.done)
1580        self.assertEqual('CLOSED', write_proto.state)
1581
1582    def test_prompt_cancellation(self):
1583        r, w = socket.socketpair()
1584        r.setblocking(False)
1585        f = self.loop.create_task(self.loop.sock_recv(r, 1))
1586        ov = getattr(f, 'ov', None)
1587        if ov is not None:
1588            self.assertTrue(ov.pending)
1589
1590        async def main():
1591            try:
1592                self.loop.call_soon(f.cancel)
1593                await f
1594            except asyncio.CancelledError:
1595                res = 'cancelled'
1596            else:
1597                res = None
1598            finally:
1599                self.loop.stop()
1600            return res
1601
1602        start = time.monotonic()
1603        t = self.loop.create_task(main())
1604        self.loop.run_forever()
1605        elapsed = time.monotonic() - start
1606
1607        self.assertLess(elapsed, 0.1)
1608        self.assertEqual(t.result(), 'cancelled')
1609        self.assertRaises(asyncio.CancelledError, f.result)
1610        if ov is not None:
1611            self.assertFalse(ov.pending)
1612        self.loop._stop_serving(r)
1613
1614        r.close()
1615        w.close()
1616
1617    def test_timeout_rounding(self):
1618        def _run_once():
1619            self.loop._run_once_counter += 1
1620            orig_run_once()
1621
1622        orig_run_once = self.loop._run_once
1623        self.loop._run_once_counter = 0
1624        self.loop._run_once = _run_once
1625
1626        async def wait():
1627            loop = self.loop
1628            await asyncio.sleep(1e-2)
1629            await asyncio.sleep(1e-4)
1630            await asyncio.sleep(1e-6)
1631            await asyncio.sleep(1e-8)
1632            await asyncio.sleep(1e-10)
1633
1634        self.loop.run_until_complete(wait())
1635        # The ideal number of call is 12, but on some platforms, the selector
1636        # may sleep at little bit less than timeout depending on the resolution
1637        # of the clock used by the kernel. Tolerate a few useless calls on
1638        # these platforms.
1639        self.assertLessEqual(self.loop._run_once_counter, 20,
1640            {'clock_resolution': self.loop._clock_resolution,
1641             'selector': self.loop._selector.__class__.__name__})
1642
1643    def test_remove_fds_after_closing(self):
1644        loop = self.create_event_loop()
1645        callback = lambda: None
1646        r, w = socket.socketpair()
1647        self.addCleanup(r.close)
1648        self.addCleanup(w.close)
1649        loop.add_reader(r, callback)
1650        loop.add_writer(w, callback)
1651        loop.close()
1652        self.assertFalse(loop.remove_reader(r))
1653        self.assertFalse(loop.remove_writer(w))
1654
1655    def test_add_fds_after_closing(self):
1656        loop = self.create_event_loop()
1657        callback = lambda: None
1658        r, w = socket.socketpair()
1659        self.addCleanup(r.close)
1660        self.addCleanup(w.close)
1661        loop.close()
1662        with self.assertRaises(RuntimeError):
1663            loop.add_reader(r, callback)
1664        with self.assertRaises(RuntimeError):
1665            loop.add_writer(w, callback)
1666
1667    def test_close_running_event_loop(self):
1668        async def close_loop(loop):
1669            self.loop.close()
1670
1671        coro = close_loop(self.loop)
1672        with self.assertRaises(RuntimeError):
1673            self.loop.run_until_complete(coro)
1674
1675    def test_close(self):
1676        self.loop.close()
1677
1678        async def test():
1679            pass
1680
1681        func = lambda: False
1682        coro = test()
1683        self.addCleanup(coro.close)
1684
1685        # operation blocked when the loop is closed
1686        with self.assertRaises(RuntimeError):
1687            self.loop.run_forever()
1688        with self.assertRaises(RuntimeError):
1689            fut = self.loop.create_future()
1690            self.loop.run_until_complete(fut)
1691        with self.assertRaises(RuntimeError):
1692            self.loop.call_soon(func)
1693        with self.assertRaises(RuntimeError):
1694            self.loop.call_soon_threadsafe(func)
1695        with self.assertRaises(RuntimeError):
1696            self.loop.call_later(1.0, func)
1697        with self.assertRaises(RuntimeError):
1698            self.loop.call_at(self.loop.time() + .0, func)
1699        with self.assertRaises(RuntimeError):
1700            self.loop.create_task(coro)
1701        with self.assertRaises(RuntimeError):
1702            self.loop.add_signal_handler(signal.SIGTERM, func)
1703
1704        # run_in_executor test is tricky: the method is a coroutine,
1705        # but run_until_complete cannot be called on closed loop.
1706        # Thus iterate once explicitly.
1707        with self.assertRaises(RuntimeError):
1708            it = self.loop.run_in_executor(None, func).__await__()
1709            next(it)
1710
1711
1712class SubprocessTestsMixin:
1713
1714    def check_terminated(self, returncode):
1715        if sys.platform == 'win32':
1716            self.assertIsInstance(returncode, int)
1717            # expect 1 but sometimes get 0
1718        else:
1719            self.assertEqual(-signal.SIGTERM, returncode)
1720
1721    def check_killed(self, returncode):
1722        if sys.platform == 'win32':
1723            self.assertIsInstance(returncode, int)
1724            # expect 1 but sometimes get 0
1725        else:
1726            self.assertEqual(-signal.SIGKILL, returncode)
1727
1728    def test_subprocess_exec(self):
1729        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1730
1731        connect = self.loop.subprocess_exec(
1732                        functools.partial(MySubprocessProtocol, self.loop),
1733                        sys.executable, prog)
1734        with self.assertWarns(DeprecationWarning):
1735            transp, proto = self.loop.run_until_complete(connect)
1736            self.assertIsInstance(proto, MySubprocessProtocol)
1737            self.loop.run_until_complete(proto.connected)
1738            self.assertEqual('CONNECTED', proto.state)
1739
1740            stdin = transp.get_pipe_transport(0)
1741            stdin.write(b'Python The Winner')
1742            self.loop.run_until_complete(proto.got_data[1].wait())
1743            with test_utils.disable_logger():
1744                transp.close()
1745            self.loop.run_until_complete(proto.completed)
1746            self.check_killed(proto.returncode)
1747            self.assertEqual(b'Python The Winner', proto.data[1])
1748
1749    def test_subprocess_interactive(self):
1750        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1751
1752        connect = self.loop.subprocess_exec(
1753                        functools.partial(MySubprocessProtocol, self.loop),
1754                        sys.executable, prog)
1755
1756        with self.assertWarns(DeprecationWarning):
1757            transp, proto = self.loop.run_until_complete(connect)
1758            self.assertIsInstance(proto, MySubprocessProtocol)
1759            self.loop.run_until_complete(proto.connected)
1760            self.assertEqual('CONNECTED', proto.state)
1761
1762            stdin = transp.get_pipe_transport(0)
1763            stdin.write(b'Python ')
1764            self.loop.run_until_complete(proto.got_data[1].wait())
1765            proto.got_data[1].clear()
1766            self.assertEqual(b'Python ', proto.data[1])
1767
1768            stdin.write(b'The Winner')
1769            self.loop.run_until_complete(proto.got_data[1].wait())
1770            self.assertEqual(b'Python The Winner', proto.data[1])
1771
1772            with test_utils.disable_logger():
1773                transp.close()
1774            self.loop.run_until_complete(proto.completed)
1775            self.check_killed(proto.returncode)
1776
1777    def test_subprocess_shell(self):
1778        with self.assertWarns(DeprecationWarning):
1779            connect = self.loop.subprocess_shell(
1780                            functools.partial(MySubprocessProtocol, self.loop),
1781                            'echo Python')
1782            transp, proto = self.loop.run_until_complete(connect)
1783            self.assertIsInstance(proto, MySubprocessProtocol)
1784            self.loop.run_until_complete(proto.connected)
1785
1786            transp.get_pipe_transport(0).close()
1787            self.loop.run_until_complete(proto.completed)
1788            self.assertEqual(0, proto.returncode)
1789            self.assertTrue(all(f.done() for f in proto.disconnects.values()))
1790            self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python')
1791            self.assertEqual(proto.data[2], b'')
1792            transp.close()
1793
1794    def test_subprocess_exitcode(self):
1795        connect = self.loop.subprocess_shell(
1796                        functools.partial(MySubprocessProtocol, self.loop),
1797                        'exit 7', stdin=None, stdout=None, stderr=None)
1798
1799        with self.assertWarns(DeprecationWarning):
1800            transp, proto = self.loop.run_until_complete(connect)
1801        self.assertIsInstance(proto, MySubprocessProtocol)
1802        self.loop.run_until_complete(proto.completed)
1803        self.assertEqual(7, proto.returncode)
1804        transp.close()
1805
1806    def test_subprocess_close_after_finish(self):
1807        connect = self.loop.subprocess_shell(
1808                        functools.partial(MySubprocessProtocol, self.loop),
1809                        'exit 7', stdin=None, stdout=None, stderr=None)
1810        with self.assertWarns(DeprecationWarning):
1811            transp, proto = self.loop.run_until_complete(connect)
1812        self.assertIsInstance(proto, MySubprocessProtocol)
1813        self.assertIsNone(transp.get_pipe_transport(0))
1814        self.assertIsNone(transp.get_pipe_transport(1))
1815        self.assertIsNone(transp.get_pipe_transport(2))
1816        self.loop.run_until_complete(proto.completed)
1817        self.assertEqual(7, proto.returncode)
1818        self.assertIsNone(transp.close())
1819
1820    def test_subprocess_kill(self):
1821        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1822
1823        connect = self.loop.subprocess_exec(
1824                        functools.partial(MySubprocessProtocol, self.loop),
1825                        sys.executable, prog)
1826
1827        with self.assertWarns(DeprecationWarning):
1828            transp, proto = self.loop.run_until_complete(connect)
1829            self.assertIsInstance(proto, MySubprocessProtocol)
1830            self.loop.run_until_complete(proto.connected)
1831
1832            transp.kill()
1833            self.loop.run_until_complete(proto.completed)
1834            self.check_killed(proto.returncode)
1835            transp.close()
1836
1837    def test_subprocess_terminate(self):
1838        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1839
1840        connect = self.loop.subprocess_exec(
1841                        functools.partial(MySubprocessProtocol, self.loop),
1842                        sys.executable, prog)
1843
1844        with self.assertWarns(DeprecationWarning):
1845            transp, proto = self.loop.run_until_complete(connect)
1846            self.assertIsInstance(proto, MySubprocessProtocol)
1847            self.loop.run_until_complete(proto.connected)
1848
1849            transp.terminate()
1850            self.loop.run_until_complete(proto.completed)
1851            self.check_terminated(proto.returncode)
1852            transp.close()
1853
1854    @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
1855    def test_subprocess_send_signal(self):
1856        # bpo-31034: Make sure that we get the default signal handler (killing
1857        # the process). The parent process may have decided to ignore SIGHUP,
1858        # and signal handlers are inherited.
1859        old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
1860        try:
1861            prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1862
1863            connect = self.loop.subprocess_exec(
1864                            functools.partial(MySubprocessProtocol, self.loop),
1865                            sys.executable, prog)
1866
1867            with self.assertWarns(DeprecationWarning):
1868                transp, proto = self.loop.run_until_complete(connect)
1869                self.assertIsInstance(proto, MySubprocessProtocol)
1870                self.loop.run_until_complete(proto.connected)
1871
1872                transp.send_signal(signal.SIGHUP)
1873                self.loop.run_until_complete(proto.completed)
1874                self.assertEqual(-signal.SIGHUP, proto.returncode)
1875                transp.close()
1876        finally:
1877            signal.signal(signal.SIGHUP, old_handler)
1878
1879    def test_subprocess_stderr(self):
1880        prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
1881
1882        connect = self.loop.subprocess_exec(
1883                        functools.partial(MySubprocessProtocol, self.loop),
1884                        sys.executable, prog)
1885
1886        with self.assertWarns(DeprecationWarning):
1887            transp, proto = self.loop.run_until_complete(connect)
1888            self.assertIsInstance(proto, MySubprocessProtocol)
1889            self.loop.run_until_complete(proto.connected)
1890
1891            stdin = transp.get_pipe_transport(0)
1892            stdin.write(b'test')
1893
1894            self.loop.run_until_complete(proto.completed)
1895
1896            transp.close()
1897            self.assertEqual(b'OUT:test', proto.data[1])
1898            self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2])
1899            self.assertEqual(0, proto.returncode)
1900
1901    def test_subprocess_stderr_redirect_to_stdout(self):
1902        prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
1903
1904        connect = self.loop.subprocess_exec(
1905                        functools.partial(MySubprocessProtocol, self.loop),
1906                        sys.executable, prog, stderr=subprocess.STDOUT)
1907
1908        with self.assertWarns(DeprecationWarning):
1909            transp, proto = self.loop.run_until_complete(connect)
1910            self.assertIsInstance(proto, MySubprocessProtocol)
1911            self.loop.run_until_complete(proto.connected)
1912
1913            stdin = transp.get_pipe_transport(0)
1914            self.assertIsNotNone(transp.get_pipe_transport(1))
1915            self.assertIsNone(transp.get_pipe_transport(2))
1916
1917            stdin.write(b'test')
1918            self.loop.run_until_complete(proto.completed)
1919            self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'),
1920                            proto.data[1])
1921            self.assertEqual(b'', proto.data[2])
1922
1923            transp.close()
1924            self.assertEqual(0, proto.returncode)
1925
1926    def test_subprocess_close_client_stream(self):
1927        prog = os.path.join(os.path.dirname(__file__), 'echo3.py')
1928
1929        connect = self.loop.subprocess_exec(
1930                        functools.partial(MySubprocessProtocol, self.loop),
1931                        sys.executable, prog)
1932        with self.assertWarns(DeprecationWarning):
1933            transp, proto = self.loop.run_until_complete(connect)
1934            self.assertIsInstance(proto, MySubprocessProtocol)
1935            self.loop.run_until_complete(proto.connected)
1936
1937            stdin = transp.get_pipe_transport(0)
1938            stdout = transp.get_pipe_transport(1)
1939            stdin.write(b'test')
1940            self.loop.run_until_complete(proto.got_data[1].wait())
1941            self.assertEqual(b'OUT:test', proto.data[1])
1942
1943            stdout.close()
1944            self.loop.run_until_complete(proto.disconnects[1])
1945            stdin.write(b'xxx')
1946            self.loop.run_until_complete(proto.got_data[2].wait())
1947            if sys.platform != 'win32':
1948                self.assertEqual(b'ERR:BrokenPipeError', proto.data[2])
1949            else:
1950                # After closing the read-end of a pipe, writing to the
1951                # write-end using os.write() fails with errno==EINVAL and
1952                # GetLastError()==ERROR_INVALID_NAME on Windows!?!  (Using
1953                # WriteFile() we get ERROR_BROKEN_PIPE as expected.)
1954                self.assertEqual(b'ERR:OSError', proto.data[2])
1955            with test_utils.disable_logger():
1956                transp.close()
1957            self.loop.run_until_complete(proto.completed)
1958            self.check_killed(proto.returncode)
1959
1960    def test_subprocess_wait_no_same_group(self):
1961        # start the new process in a new session
1962        connect = self.loop.subprocess_shell(
1963                        functools.partial(MySubprocessProtocol, self.loop),
1964                        'exit 7', stdin=None, stdout=None, stderr=None,
1965                        start_new_session=True)
1966        _, proto = yield self.loop.run_until_complete(connect)
1967        self.assertIsInstance(proto, MySubprocessProtocol)
1968        self.loop.run_until_complete(proto.completed)
1969        self.assertEqual(7, proto.returncode)
1970
1971    def test_subprocess_exec_invalid_args(self):
1972        async def connect(**kwds):
1973            await self.loop.subprocess_exec(
1974                asyncio.SubprocessProtocol,
1975                'pwd', **kwds)
1976
1977        with self.assertRaises(ValueError):
1978            self.loop.run_until_complete(connect(universal_newlines=True))
1979        with self.assertRaises(ValueError):
1980            self.loop.run_until_complete(connect(bufsize=4096))
1981        with self.assertRaises(ValueError):
1982            self.loop.run_until_complete(connect(shell=True))
1983
1984    def test_subprocess_shell_invalid_args(self):
1985
1986        async def connect(cmd=None, **kwds):
1987            if not cmd:
1988                cmd = 'pwd'
1989            await self.loop.subprocess_shell(
1990                asyncio.SubprocessProtocol,
1991                cmd, **kwds)
1992
1993        with self.assertRaises(ValueError):
1994            self.loop.run_until_complete(connect(['ls', '-l']))
1995        with self.assertRaises(ValueError):
1996            self.loop.run_until_complete(connect(universal_newlines=True))
1997        with self.assertRaises(ValueError):
1998            self.loop.run_until_complete(connect(bufsize=4096))
1999        with self.assertRaises(ValueError):
2000            self.loop.run_until_complete(connect(shell=False))
2001
2002
2003if sys.platform == 'win32':
2004
2005    class SelectEventLoopTests(EventLoopTestsMixin,
2006                               test_utils.TestCase):
2007
2008        def create_event_loop(self):
2009            return asyncio.SelectorEventLoop()
2010
2011    class ProactorEventLoopTests(EventLoopTestsMixin,
2012                                 SubprocessTestsMixin,
2013                                 test_utils.TestCase):
2014
2015        def create_event_loop(self):
2016            return asyncio.ProactorEventLoop()
2017
2018        def test_reader_callback(self):
2019            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2020
2021        def test_reader_callback_cancel(self):
2022            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2023
2024        def test_writer_callback(self):
2025            raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
2026
2027        def test_writer_callback_cancel(self):
2028            raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
2029
2030        def test_remove_fds_after_closing(self):
2031            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2032else:
2033    import selectors
2034
2035    class UnixEventLoopTestsMixin(EventLoopTestsMixin):
2036        def setUp(self):
2037            super().setUp()
2038            watcher = asyncio.SafeChildWatcher()
2039            watcher.attach_loop(self.loop)
2040            asyncio.set_child_watcher(watcher)
2041
2042        def tearDown(self):
2043            asyncio.set_child_watcher(None)
2044            super().tearDown()
2045
2046
2047    if hasattr(selectors, 'KqueueSelector'):
2048        class KqueueEventLoopTests(UnixEventLoopTestsMixin,
2049                                   SubprocessTestsMixin,
2050                                   test_utils.TestCase):
2051
2052            def create_event_loop(self):
2053                return asyncio.SelectorEventLoop(
2054                    selectors.KqueueSelector())
2055
2056            # kqueue doesn't support character devices (PTY) on Mac OS X older
2057            # than 10.9 (Maverick)
2058            @support.requires_mac_ver(10, 9)
2059            # Issue #20667: KqueueEventLoopTests.test_read_pty_output()
2060            # hangs on OpenBSD 5.5
2061            @unittest.skipIf(sys.platform.startswith('openbsd'),
2062                             'test hangs on OpenBSD')
2063            def test_read_pty_output(self):
2064                super().test_read_pty_output()
2065
2066            # kqueue doesn't support character devices (PTY) on Mac OS X older
2067            # than 10.9 (Maverick)
2068            @support.requires_mac_ver(10, 9)
2069            def test_write_pty(self):
2070                super().test_write_pty()
2071
2072    if hasattr(selectors, 'EpollSelector'):
2073        class EPollEventLoopTests(UnixEventLoopTestsMixin,
2074                                  SubprocessTestsMixin,
2075                                  test_utils.TestCase):
2076
2077            def create_event_loop(self):
2078                return asyncio.SelectorEventLoop(selectors.EpollSelector())
2079
2080    if hasattr(selectors, 'PollSelector'):
2081        class PollEventLoopTests(UnixEventLoopTestsMixin,
2082                                 SubprocessTestsMixin,
2083                                 test_utils.TestCase):
2084
2085            def create_event_loop(self):
2086                return asyncio.SelectorEventLoop(selectors.PollSelector())
2087
2088    # Should always exist.
2089    class SelectEventLoopTests(UnixEventLoopTestsMixin,
2090                               SubprocessTestsMixin,
2091                               test_utils.TestCase):
2092
2093        def create_event_loop(self):
2094            return asyncio.SelectorEventLoop(selectors.SelectSelector())
2095
2096
2097def noop(*args, **kwargs):
2098    pass
2099
2100
2101class HandleTests(test_utils.TestCase):
2102
2103    def setUp(self):
2104        super().setUp()
2105        self.loop = mock.Mock()
2106        self.loop.get_debug.return_value = True
2107
2108    def test_handle(self):
2109        def callback(*args):
2110            return args
2111
2112        args = ()
2113        h = asyncio.Handle(callback, args, self.loop)
2114        self.assertIs(h._callback, callback)
2115        self.assertIs(h._args, args)
2116        self.assertFalse(h.cancelled())
2117
2118        h.cancel()
2119        self.assertTrue(h.cancelled())
2120
2121    def test_callback_with_exception(self):
2122        def callback():
2123            raise ValueError()
2124
2125        self.loop = mock.Mock()
2126        self.loop.call_exception_handler = mock.Mock()
2127
2128        h = asyncio.Handle(callback, (), self.loop)
2129        h._run()
2130
2131        self.loop.call_exception_handler.assert_called_with({
2132            'message': test_utils.MockPattern('Exception in callback.*'),
2133            'exception': mock.ANY,
2134            'handle': h,
2135            'source_traceback': h._source_traceback,
2136        })
2137
2138    def test_handle_weakref(self):
2139        wd = weakref.WeakValueDictionary()
2140        h = asyncio.Handle(lambda: None, (), self.loop)
2141        wd['h'] = h  # Would fail without __weakref__ slot.
2142
2143    def test_handle_repr(self):
2144        self.loop.get_debug.return_value = False
2145
2146        # simple function
2147        h = asyncio.Handle(noop, (1, 2), self.loop)
2148        filename, lineno = test_utils.get_function_source(noop)
2149        self.assertEqual(repr(h),
2150                        '<Handle noop(1, 2) at %s:%s>'
2151                        % (filename, lineno))
2152
2153        # cancelled handle
2154        h.cancel()
2155        self.assertEqual(repr(h),
2156                        '<Handle cancelled>')
2157
2158        # decorated function
2159        with self.assertWarns(DeprecationWarning):
2160            cb = asyncio.coroutine(noop)
2161        h = asyncio.Handle(cb, (), self.loop)
2162        self.assertEqual(repr(h),
2163                        '<Handle noop() at %s:%s>'
2164                        % (filename, lineno))
2165
2166        # partial function
2167        cb = functools.partial(noop, 1, 2)
2168        h = asyncio.Handle(cb, (3,), self.loop)
2169        regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$'
2170                 % (re.escape(filename), lineno))
2171        self.assertRegex(repr(h), regex)
2172
2173        # partial function with keyword args
2174        cb = functools.partial(noop, x=1)
2175        h = asyncio.Handle(cb, (2, 3), self.loop)
2176        regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$'
2177                 % (re.escape(filename), lineno))
2178        self.assertRegex(repr(h), regex)
2179
2180        # partial method
2181        if sys.version_info >= (3, 4):
2182            method = HandleTests.test_handle_repr
2183            cb = functools.partialmethod(method)
2184            filename, lineno = test_utils.get_function_source(method)
2185            h = asyncio.Handle(cb, (), self.loop)
2186
2187            cb_regex = r'<function HandleTests.test_handle_repr .*>'
2188            cb_regex = (r'functools.partialmethod\(%s, , \)\(\)' % cb_regex)
2189            regex = (r'^<Handle %s at %s:%s>$'
2190                     % (cb_regex, re.escape(filename), lineno))
2191            self.assertRegex(repr(h), regex)
2192
2193    def test_handle_repr_debug(self):
2194        self.loop.get_debug.return_value = True
2195
2196        # simple function
2197        create_filename = __file__
2198        create_lineno = sys._getframe().f_lineno + 1
2199        h = asyncio.Handle(noop, (1, 2), self.loop)
2200        filename, lineno = test_utils.get_function_source(noop)
2201        self.assertEqual(repr(h),
2202                        '<Handle noop(1, 2) at %s:%s created at %s:%s>'
2203                        % (filename, lineno, create_filename, create_lineno))
2204
2205        # cancelled handle
2206        h.cancel()
2207        self.assertEqual(
2208            repr(h),
2209            '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
2210            % (filename, lineno, create_filename, create_lineno))
2211
2212        # double cancellation won't overwrite _repr
2213        h.cancel()
2214        self.assertEqual(
2215            repr(h),
2216            '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
2217            % (filename, lineno, create_filename, create_lineno))
2218
2219    def test_handle_source_traceback(self):
2220        loop = asyncio.get_event_loop_policy().new_event_loop()
2221        loop.set_debug(True)
2222        self.set_event_loop(loop)
2223
2224        def check_source_traceback(h):
2225            lineno = sys._getframe(1).f_lineno - 1
2226            self.assertIsInstance(h._source_traceback, list)
2227            self.assertEqual(h._source_traceback[-1][:3],
2228                             (__file__,
2229                              lineno,
2230                              'test_handle_source_traceback'))
2231
2232        # call_soon
2233        h = loop.call_soon(noop)
2234        check_source_traceback(h)
2235
2236        # call_soon_threadsafe
2237        h = loop.call_soon_threadsafe(noop)
2238        check_source_traceback(h)
2239
2240        # call_later
2241        h = loop.call_later(0, noop)
2242        check_source_traceback(h)
2243
2244        # call_at
2245        h = loop.call_later(0, noop)
2246        check_source_traceback(h)
2247
2248    @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'),
2249                         'No collections.abc.Coroutine')
2250    def test_coroutine_like_object_debug_formatting(self):
2251        # Test that asyncio can format coroutines that are instances of
2252        # collections.abc.Coroutine, but lack cr_core or gi_code attributes
2253        # (such as ones compiled with Cython).
2254
2255        coro = CoroLike()
2256        coro.__name__ = 'AAA'
2257        self.assertTrue(asyncio.iscoroutine(coro))
2258        self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
2259
2260        coro.__qualname__ = 'BBB'
2261        self.assertEqual(coroutines._format_coroutine(coro), 'BBB()')
2262
2263        coro.cr_running = True
2264        self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running')
2265
2266        coro.__name__ = coro.__qualname__ = None
2267        self.assertEqual(coroutines._format_coroutine(coro),
2268                         '<CoroLike without __name__>() running')
2269
2270        coro = CoroLike()
2271        coro.__qualname__ = 'CoroLike'
2272        # Some coroutines might not have '__name__', such as
2273        # built-in async_gen.asend().
2274        self.assertEqual(coroutines._format_coroutine(coro), 'CoroLike()')
2275
2276        coro = CoroLike()
2277        coro.__qualname__ = 'AAA'
2278        coro.cr_code = None
2279        self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
2280
2281
2282class TimerTests(unittest.TestCase):
2283
2284    def setUp(self):
2285        super().setUp()
2286        self.loop = mock.Mock()
2287
2288    def test_hash(self):
2289        when = time.monotonic()
2290        h = asyncio.TimerHandle(when, lambda: False, (),
2291                                mock.Mock())
2292        self.assertEqual(hash(h), hash(when))
2293
2294    def test_when(self):
2295        when = time.monotonic()
2296        h = asyncio.TimerHandle(when, lambda: False, (),
2297                                mock.Mock())
2298        self.assertEqual(when, h.when())
2299
2300    def test_timer(self):
2301        def callback(*args):
2302            return args
2303
2304        args = (1, 2, 3)
2305        when = time.monotonic()
2306        h = asyncio.TimerHandle(when, callback, args, mock.Mock())
2307        self.assertIs(h._callback, callback)
2308        self.assertIs(h._args, args)
2309        self.assertFalse(h.cancelled())
2310
2311        # cancel
2312        h.cancel()
2313        self.assertTrue(h.cancelled())
2314        self.assertIsNone(h._callback)
2315        self.assertIsNone(h._args)
2316
2317        # when cannot be None
2318        self.assertRaises(AssertionError,
2319                          asyncio.TimerHandle, None, callback, args,
2320                          self.loop)
2321
2322    def test_timer_repr(self):
2323        self.loop.get_debug.return_value = False
2324
2325        # simple function
2326        h = asyncio.TimerHandle(123, noop, (), self.loop)
2327        src = test_utils.get_function_source(noop)
2328        self.assertEqual(repr(h),
2329                        '<TimerHandle when=123 noop() at %s:%s>' % src)
2330
2331        # cancelled handle
2332        h.cancel()
2333        self.assertEqual(repr(h),
2334                        '<TimerHandle cancelled when=123>')
2335
2336    def test_timer_repr_debug(self):
2337        self.loop.get_debug.return_value = True
2338
2339        # simple function
2340        create_filename = __file__
2341        create_lineno = sys._getframe().f_lineno + 1
2342        h = asyncio.TimerHandle(123, noop, (), self.loop)
2343        filename, lineno = test_utils.get_function_source(noop)
2344        self.assertEqual(repr(h),
2345                        '<TimerHandle when=123 noop() '
2346                        'at %s:%s created at %s:%s>'
2347                        % (filename, lineno, create_filename, create_lineno))
2348
2349        # cancelled handle
2350        h.cancel()
2351        self.assertEqual(repr(h),
2352                        '<TimerHandle cancelled when=123 noop() '
2353                        'at %s:%s created at %s:%s>'
2354                        % (filename, lineno, create_filename, create_lineno))
2355
2356
2357    def test_timer_comparison(self):
2358        def callback(*args):
2359            return args
2360
2361        when = time.monotonic()
2362
2363        h1 = asyncio.TimerHandle(when, callback, (), self.loop)
2364        h2 = asyncio.TimerHandle(when, callback, (), self.loop)
2365        # TODO: Use assertLess etc.
2366        self.assertFalse(h1 < h2)
2367        self.assertFalse(h2 < h1)
2368        self.assertTrue(h1 <= h2)
2369        self.assertTrue(h2 <= h1)
2370        self.assertFalse(h1 > h2)
2371        self.assertFalse(h2 > h1)
2372        self.assertTrue(h1 >= h2)
2373        self.assertTrue(h2 >= h1)
2374        self.assertTrue(h1 == h2)
2375        self.assertFalse(h1 != h2)
2376
2377        h2.cancel()
2378        self.assertFalse(h1 == h2)
2379
2380        h1 = asyncio.TimerHandle(when, callback, (), self.loop)
2381        h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop)
2382        self.assertTrue(h1 < h2)
2383        self.assertFalse(h2 < h1)
2384        self.assertTrue(h1 <= h2)
2385        self.assertFalse(h2 <= h1)
2386        self.assertFalse(h1 > h2)
2387        self.assertTrue(h2 > h1)
2388        self.assertFalse(h1 >= h2)
2389        self.assertTrue(h2 >= h1)
2390        self.assertFalse(h1 == h2)
2391        self.assertTrue(h1 != h2)
2392
2393        h3 = asyncio.Handle(callback, (), self.loop)
2394        self.assertIs(NotImplemented, h1.__eq__(h3))
2395        self.assertIs(NotImplemented, h1.__ne__(h3))
2396
2397
2398class AbstractEventLoopTests(unittest.TestCase):
2399
2400    def test_not_implemented(self):
2401        f = mock.Mock()
2402        loop = asyncio.AbstractEventLoop()
2403        self.assertRaises(
2404            NotImplementedError, loop.run_forever)
2405        self.assertRaises(
2406            NotImplementedError, loop.run_until_complete, None)
2407        self.assertRaises(
2408            NotImplementedError, loop.stop)
2409        self.assertRaises(
2410            NotImplementedError, loop.is_running)
2411        self.assertRaises(
2412            NotImplementedError, loop.is_closed)
2413        self.assertRaises(
2414            NotImplementedError, loop.close)
2415        self.assertRaises(
2416            NotImplementedError, loop.create_task, None)
2417        self.assertRaises(
2418            NotImplementedError, loop.call_later, None, None)
2419        self.assertRaises(
2420            NotImplementedError, loop.call_at, f, f)
2421        self.assertRaises(
2422            NotImplementedError, loop.call_soon, None)
2423        self.assertRaises(
2424            NotImplementedError, loop.time)
2425        self.assertRaises(
2426            NotImplementedError, loop.call_soon_threadsafe, None)
2427        self.assertRaises(
2428            NotImplementedError, loop.set_default_executor, f)
2429        self.assertRaises(
2430            NotImplementedError, loop.add_reader, 1, f)
2431        self.assertRaises(
2432            NotImplementedError, loop.remove_reader, 1)
2433        self.assertRaises(
2434            NotImplementedError, loop.add_writer, 1, f)
2435        self.assertRaises(
2436            NotImplementedError, loop.remove_writer, 1)
2437        self.assertRaises(
2438            NotImplementedError, loop.add_signal_handler, 1, f)
2439        self.assertRaises(
2440            NotImplementedError, loop.remove_signal_handler, 1)
2441        self.assertRaises(
2442            NotImplementedError, loop.remove_signal_handler, 1)
2443        self.assertRaises(
2444            NotImplementedError, loop.set_exception_handler, f)
2445        self.assertRaises(
2446            NotImplementedError, loop.default_exception_handler, f)
2447        self.assertRaises(
2448            NotImplementedError, loop.call_exception_handler, f)
2449        self.assertRaises(
2450            NotImplementedError, loop.get_debug)
2451        self.assertRaises(
2452            NotImplementedError, loop.set_debug, f)
2453
2454    def test_not_implemented_async(self):
2455
2456        async def inner():
2457            f = mock.Mock()
2458            loop = asyncio.AbstractEventLoop()
2459
2460            with self.assertRaises(NotImplementedError):
2461                await loop.run_in_executor(f, f)
2462            with self.assertRaises(NotImplementedError):
2463                await loop.getaddrinfo('localhost', 8080)
2464            with self.assertRaises(NotImplementedError):
2465                await loop.getnameinfo(('localhost', 8080))
2466            with self.assertRaises(NotImplementedError):
2467                await loop.create_connection(f)
2468            with self.assertRaises(NotImplementedError):
2469                await loop.create_server(f)
2470            with self.assertRaises(NotImplementedError):
2471                await loop.create_datagram_endpoint(f)
2472            with self.assertRaises(NotImplementedError):
2473                await loop.sock_recv(f, 10)
2474            with self.assertRaises(NotImplementedError):
2475                await loop.sock_recv_into(f, 10)
2476            with self.assertRaises(NotImplementedError):
2477                await loop.sock_sendall(f, 10)
2478            with self.assertRaises(NotImplementedError):
2479                await loop.sock_connect(f, f)
2480            with self.assertRaises(NotImplementedError):
2481                await loop.sock_accept(f)
2482            with self.assertRaises(NotImplementedError):
2483                await loop.sock_sendfile(f, f)
2484            with self.assertRaises(NotImplementedError):
2485                await loop.sendfile(f, f)
2486            with self.assertRaises(NotImplementedError):
2487                await loop.connect_read_pipe(f, mock.sentinel.pipe)
2488            with self.assertRaises(NotImplementedError):
2489                await loop.connect_write_pipe(f, mock.sentinel.pipe)
2490            with self.assertRaises(NotImplementedError):
2491                await loop.subprocess_shell(f, mock.sentinel)
2492            with self.assertRaises(NotImplementedError):
2493                await loop.subprocess_exec(f)
2494
2495        loop = asyncio.new_event_loop()
2496        loop.run_until_complete(inner())
2497        loop.close()
2498
2499
2500class PolicyTests(unittest.TestCase):
2501
2502    def test_event_loop_policy(self):
2503        policy = asyncio.AbstractEventLoopPolicy()
2504        self.assertRaises(NotImplementedError, policy.get_event_loop)
2505        self.assertRaises(NotImplementedError, policy.set_event_loop, object())
2506        self.assertRaises(NotImplementedError, policy.new_event_loop)
2507        self.assertRaises(NotImplementedError, policy.get_child_watcher)
2508        self.assertRaises(NotImplementedError, policy.set_child_watcher,
2509                          object())
2510
2511    def test_get_event_loop(self):
2512        policy = asyncio.DefaultEventLoopPolicy()
2513        self.assertIsNone(policy._local._loop)
2514
2515        loop = policy.get_event_loop()
2516        self.assertIsInstance(loop, asyncio.AbstractEventLoop)
2517
2518        self.assertIs(policy._local._loop, loop)
2519        self.assertIs(loop, policy.get_event_loop())
2520        loop.close()
2521
2522    def test_get_event_loop_calls_set_event_loop(self):
2523        policy = asyncio.DefaultEventLoopPolicy()
2524
2525        with mock.patch.object(
2526                policy, "set_event_loop",
2527                wraps=policy.set_event_loop) as m_set_event_loop:
2528
2529            loop = policy.get_event_loop()
2530
2531            # policy._local._loop must be set through .set_event_loop()
2532            # (the unix DefaultEventLoopPolicy needs this call to attach
2533            # the child watcher correctly)
2534            m_set_event_loop.assert_called_with(loop)
2535
2536        loop.close()
2537
2538    def test_get_event_loop_after_set_none(self):
2539        policy = asyncio.DefaultEventLoopPolicy()
2540        policy.set_event_loop(None)
2541        self.assertRaises(RuntimeError, policy.get_event_loop)
2542
2543    @mock.patch('asyncio.events.threading.current_thread')
2544    def test_get_event_loop_thread(self, m_current_thread):
2545
2546        def f():
2547            policy = asyncio.DefaultEventLoopPolicy()
2548            self.assertRaises(RuntimeError, policy.get_event_loop)
2549
2550        th = threading.Thread(target=f)
2551        th.start()
2552        th.join()
2553
2554    def test_new_event_loop(self):
2555        policy = asyncio.DefaultEventLoopPolicy()
2556
2557        loop = policy.new_event_loop()
2558        self.assertIsInstance(loop, asyncio.AbstractEventLoop)
2559        loop.close()
2560
2561    def test_set_event_loop(self):
2562        policy = asyncio.DefaultEventLoopPolicy()
2563        old_loop = policy.get_event_loop()
2564
2565        self.assertRaises(AssertionError, policy.set_event_loop, object())
2566
2567        loop = policy.new_event_loop()
2568        policy.set_event_loop(loop)
2569        self.assertIs(loop, policy.get_event_loop())
2570        self.assertIsNot(old_loop, policy.get_event_loop())
2571        loop.close()
2572        old_loop.close()
2573
2574    def test_get_event_loop_policy(self):
2575        policy = asyncio.get_event_loop_policy()
2576        self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
2577        self.assertIs(policy, asyncio.get_event_loop_policy())
2578
2579    def test_set_event_loop_policy(self):
2580        self.assertRaises(
2581            AssertionError, asyncio.set_event_loop_policy, object())
2582
2583        old_policy = asyncio.get_event_loop_policy()
2584
2585        policy = asyncio.DefaultEventLoopPolicy()
2586        asyncio.set_event_loop_policy(policy)
2587        self.assertIs(policy, asyncio.get_event_loop_policy())
2588        self.assertIsNot(policy, old_policy)
2589
2590
2591class GetEventLoopTestsMixin:
2592
2593    _get_running_loop_impl = None
2594    _set_running_loop_impl = None
2595    get_running_loop_impl = None
2596    get_event_loop_impl = None
2597
2598    def setUp(self):
2599        self._get_running_loop_saved = events._get_running_loop
2600        self._set_running_loop_saved = events._set_running_loop
2601        self.get_running_loop_saved = events.get_running_loop
2602        self.get_event_loop_saved = events.get_event_loop
2603
2604        events._get_running_loop = type(self)._get_running_loop_impl
2605        events._set_running_loop = type(self)._set_running_loop_impl
2606        events.get_running_loop = type(self).get_running_loop_impl
2607        events.get_event_loop = type(self).get_event_loop_impl
2608
2609        asyncio._get_running_loop = type(self)._get_running_loop_impl
2610        asyncio._set_running_loop = type(self)._set_running_loop_impl
2611        asyncio.get_running_loop = type(self).get_running_loop_impl
2612        asyncio.get_event_loop = type(self).get_event_loop_impl
2613
2614        super().setUp()
2615
2616        self.loop = asyncio.new_event_loop()
2617        asyncio.set_event_loop(self.loop)
2618
2619        if sys.platform != 'win32':
2620            watcher = asyncio.SafeChildWatcher()
2621            watcher.attach_loop(self.loop)
2622            asyncio.set_child_watcher(watcher)
2623
2624    def tearDown(self):
2625        try:
2626            if sys.platform != 'win32':
2627                asyncio.set_child_watcher(None)
2628
2629            super().tearDown()
2630        finally:
2631            self.loop.close()
2632            asyncio.set_event_loop(None)
2633
2634            events._get_running_loop = self._get_running_loop_saved
2635            events._set_running_loop = self._set_running_loop_saved
2636            events.get_running_loop = self.get_running_loop_saved
2637            events.get_event_loop = self.get_event_loop_saved
2638
2639            asyncio._get_running_loop = self._get_running_loop_saved
2640            asyncio._set_running_loop = self._set_running_loop_saved
2641            asyncio.get_running_loop = self.get_running_loop_saved
2642            asyncio.get_event_loop = self.get_event_loop_saved
2643
2644    if sys.platform != 'win32':
2645
2646        def test_get_event_loop_new_process(self):
2647            # bpo-32126: The multiprocessing module used by
2648            # ProcessPoolExecutor is not functional when the
2649            # multiprocessing.synchronize module cannot be imported.
2650            support.skip_if_broken_multiprocessing_synchronize()
2651
2652            async def main():
2653                pool = concurrent.futures.ProcessPoolExecutor()
2654                result = await self.loop.run_in_executor(
2655                    pool, _test_get_event_loop_new_process__sub_proc)
2656                pool.shutdown()
2657                return result
2658
2659            self.assertEqual(
2660                self.loop.run_until_complete(main()),
2661                'hello')
2662
2663    def test_get_event_loop_returns_running_loop(self):
2664        class TestError(Exception):
2665            pass
2666
2667        class Policy(asyncio.DefaultEventLoopPolicy):
2668            def get_event_loop(self):
2669                raise TestError
2670
2671        old_policy = asyncio.get_event_loop_policy()
2672        try:
2673            asyncio.set_event_loop_policy(Policy())
2674            loop = asyncio.new_event_loop()
2675
2676            with self.assertRaises(TestError):
2677                asyncio.get_event_loop()
2678            asyncio.set_event_loop(None)
2679            with self.assertRaises(TestError):
2680                asyncio.get_event_loop()
2681
2682            with self.assertRaisesRegex(RuntimeError, 'no running'):
2683                self.assertIs(asyncio.get_running_loop(), None)
2684            self.assertIs(asyncio._get_running_loop(), None)
2685
2686            async def func():
2687                self.assertIs(asyncio.get_event_loop(), loop)
2688                self.assertIs(asyncio.get_running_loop(), loop)
2689                self.assertIs(asyncio._get_running_loop(), loop)
2690
2691            loop.run_until_complete(func())
2692
2693            asyncio.set_event_loop(loop)
2694            with self.assertRaises(TestError):
2695                asyncio.get_event_loop()
2696
2697            asyncio.set_event_loop(None)
2698            with self.assertRaises(TestError):
2699                asyncio.get_event_loop()
2700
2701        finally:
2702            asyncio.set_event_loop_policy(old_policy)
2703            if loop is not None:
2704                loop.close()
2705
2706        with self.assertRaisesRegex(RuntimeError, 'no running'):
2707            self.assertIs(asyncio.get_running_loop(), None)
2708
2709        self.assertIs(asyncio._get_running_loop(), None)
2710
2711
2712class TestPyGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):
2713
2714    _get_running_loop_impl = events._py__get_running_loop
2715    _set_running_loop_impl = events._py__set_running_loop
2716    get_running_loop_impl = events._py_get_running_loop
2717    get_event_loop_impl = events._py_get_event_loop
2718
2719
2720try:
2721    import _asyncio  # NoQA
2722except ImportError:
2723    pass
2724else:
2725
2726    class TestCGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):
2727
2728        _get_running_loop_impl = events._c__get_running_loop
2729        _set_running_loop_impl = events._c__set_running_loop
2730        get_running_loop_impl = events._c_get_running_loop
2731        get_event_loop_impl = events._c_get_event_loop
2732
2733
2734class TestServer(unittest.TestCase):
2735
2736    def test_get_loop(self):
2737        loop = asyncio.new_event_loop()
2738        self.addCleanup(loop.close)
2739        proto = MyProto(loop)
2740        server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0))
2741        self.assertEqual(server.get_loop(), loop)
2742        server.close()
2743        loop.run_until_complete(server.wait_closed())
2744
2745
2746class TestAbstractServer(unittest.TestCase):
2747
2748    def test_close(self):
2749        with self.assertRaises(NotImplementedError):
2750            events.AbstractServer().close()
2751
2752    def test_wait_closed(self):
2753        loop = asyncio.new_event_loop()
2754        self.addCleanup(loop.close)
2755
2756        with self.assertRaises(NotImplementedError):
2757            loop.run_until_complete(events.AbstractServer().wait_closed())
2758
2759    def test_get_loop(self):
2760        with self.assertRaises(NotImplementedError):
2761            events.AbstractServer().get_loop()
2762
2763
2764if __name__ == '__main__':
2765    unittest.main()
2766