• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for events.py."""
2
3import collections.abc
4import concurrent.futures
5import functools
6import io
7import os
8import platform
9import re
10import signal
11import socket
12try:
13    import ssl
14except ImportError:
15    ssl = None
16import subprocess
17import sys
18import threading
19import time
20import errno
21import unittest
22from unittest import mock
23import weakref
24
25if sys.platform != 'win32':
26    import tty
27
28import asyncio
29from asyncio import coroutines
30from asyncio import events
31from asyncio import proactor_events
32from asyncio import selector_events
33from test.test_asyncio import utils as test_utils
34from test import support
35
36
37def tearDownModule():
38    asyncio.set_event_loop_policy(None)
39
40
41def broken_unix_getsockname():
42    """Return True if the platform is Mac OS 10.4 or older."""
43    if sys.platform.startswith("aix"):
44        return True
45    elif sys.platform != 'darwin':
46        return False
47    version = platform.mac_ver()[0]
48    version = tuple(map(int, version.split('.')))
49    return version < (10, 5)
50
51
52def _test_get_event_loop_new_process__sub_proc():
53    async def doit():
54        return 'hello'
55
56    loop = asyncio.new_event_loop()
57    asyncio.set_event_loop(loop)
58    return loop.run_until_complete(doit())
59
60
61class CoroLike:
62    def send(self, v):
63        pass
64
65    def throw(self, *exc):
66        pass
67
68    def close(self):
69        pass
70
71    def __await__(self):
72        pass
73
74
75class MyBaseProto(asyncio.Protocol):
76    connected = None
77    done = None
78
79    def __init__(self, loop=None):
80        self.transport = None
81        self.state = 'INITIAL'
82        self.nbytes = 0
83        if loop is not None:
84            self.connected = loop.create_future()
85            self.done = loop.create_future()
86
87    def connection_made(self, transport):
88        self.transport = transport
89        assert self.state == 'INITIAL', self.state
90        self.state = 'CONNECTED'
91        if self.connected:
92            self.connected.set_result(None)
93
94    def data_received(self, data):
95        assert self.state == 'CONNECTED', self.state
96        self.nbytes += len(data)
97
98    def eof_received(self):
99        assert self.state == 'CONNECTED', self.state
100        self.state = 'EOF'
101
102    def connection_lost(self, exc):
103        assert self.state in ('CONNECTED', 'EOF'), self.state
104        self.state = 'CLOSED'
105        if self.done:
106            self.done.set_result(None)
107
108
109class MyProto(MyBaseProto):
110    def connection_made(self, transport):
111        super().connection_made(transport)
112        transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
113
114
115class MyDatagramProto(asyncio.DatagramProtocol):
116    done = None
117
118    def __init__(self, loop=None):
119        self.state = 'INITIAL'
120        self.nbytes = 0
121        if loop is not None:
122            self.done = loop.create_future()
123
124    def connection_made(self, transport):
125        self.transport = transport
126        assert self.state == 'INITIAL', self.state
127        self.state = 'INITIALIZED'
128
129    def datagram_received(self, data, addr):
130        assert self.state == 'INITIALIZED', self.state
131        self.nbytes += len(data)
132
133    def error_received(self, exc):
134        assert self.state == 'INITIALIZED', self.state
135
136    def connection_lost(self, exc):
137        assert self.state == 'INITIALIZED', self.state
138        self.state = 'CLOSED'
139        if self.done:
140            self.done.set_result(None)
141
142
143class MyReadPipeProto(asyncio.Protocol):
144    done = None
145
146    def __init__(self, loop=None):
147        self.state = ['INITIAL']
148        self.nbytes = 0
149        self.transport = None
150        if loop is not None:
151            self.done = loop.create_future()
152
153    def connection_made(self, transport):
154        self.transport = transport
155        assert self.state == ['INITIAL'], self.state
156        self.state.append('CONNECTED')
157
158    def data_received(self, data):
159        assert self.state == ['INITIAL', 'CONNECTED'], self.state
160        self.nbytes += len(data)
161
162    def eof_received(self):
163        assert self.state == ['INITIAL', 'CONNECTED'], self.state
164        self.state.append('EOF')
165
166    def connection_lost(self, exc):
167        if 'EOF' not in self.state:
168            self.state.append('EOF')  # It is okay if EOF is missed.
169        assert self.state == ['INITIAL', 'CONNECTED', 'EOF'], self.state
170        self.state.append('CLOSED')
171        if self.done:
172            self.done.set_result(None)
173
174
175class MyWritePipeProto(asyncio.BaseProtocol):
176    done = None
177
178    def __init__(self, loop=None):
179        self.state = 'INITIAL'
180        self.transport = None
181        if loop is not None:
182            self.done = loop.create_future()
183
184    def connection_made(self, transport):
185        self.transport = transport
186        assert self.state == 'INITIAL', self.state
187        self.state = 'CONNECTED'
188
189    def connection_lost(self, exc):
190        assert self.state == 'CONNECTED', self.state
191        self.state = 'CLOSED'
192        if self.done:
193            self.done.set_result(None)
194
195
196class MySubprocessProtocol(asyncio.SubprocessProtocol):
197
198    def __init__(self, loop):
199        self.state = 'INITIAL'
200        self.transport = None
201        self.connected = loop.create_future()
202        self.completed = loop.create_future()
203        self.disconnects = {fd: loop.create_future() for fd in range(3)}
204        self.data = {1: b'', 2: b''}
205        self.returncode = None
206        self.got_data = {1: asyncio.Event(loop=loop),
207                         2: asyncio.Event(loop=loop)}
208
209    def connection_made(self, transport):
210        self.transport = transport
211        assert self.state == 'INITIAL', self.state
212        self.state = 'CONNECTED'
213        self.connected.set_result(None)
214
215    def connection_lost(self, exc):
216        assert self.state == 'CONNECTED', self.state
217        self.state = 'CLOSED'
218        self.completed.set_result(None)
219
220    def pipe_data_received(self, fd, data):
221        assert self.state == 'CONNECTED', self.state
222        self.data[fd] += data
223        self.got_data[fd].set()
224
225    def pipe_connection_lost(self, fd, exc):
226        assert self.state == 'CONNECTED', self.state
227        if exc:
228            self.disconnects[fd].set_exception(exc)
229        else:
230            self.disconnects[fd].set_result(exc)
231
232    def process_exited(self):
233        assert self.state == 'CONNECTED', self.state
234        self.returncode = self.transport.get_returncode()
235
236
237class EventLoopTestsMixin:
238
239    def setUp(self):
240        super().setUp()
241        self.loop = self.create_event_loop()
242        self.set_event_loop(self.loop)
243
244    def tearDown(self):
245        # just in case if we have transport close callbacks
246        if not self.loop.is_closed():
247            test_utils.run_briefly(self.loop)
248
249        self.doCleanups()
250        support.gc_collect()
251        super().tearDown()
252
253    def test_run_until_complete_nesting(self):
254        async def coro1():
255            await asyncio.sleep(0)
256
257        async def coro2():
258            self.assertTrue(self.loop.is_running())
259            self.loop.run_until_complete(coro1())
260
261        self.assertRaises(
262            RuntimeError, self.loop.run_until_complete, coro2())
263
264    # Note: because of the default Windows timing granularity of
265    # 15.6 msec, we use fairly long sleep times here (~100 msec).
266
267    def test_run_until_complete(self):
268        t0 = self.loop.time()
269        self.loop.run_until_complete(asyncio.sleep(0.1))
270        t1 = self.loop.time()
271        self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
272
273    def test_run_until_complete_stopped(self):
274
275        async def cb():
276            self.loop.stop()
277            await asyncio.sleep(0.1)
278        task = cb()
279        self.assertRaises(RuntimeError,
280                          self.loop.run_until_complete, task)
281
282    def test_call_later(self):
283        results = []
284
285        def callback(arg):
286            results.append(arg)
287            self.loop.stop()
288
289        self.loop.call_later(0.1, callback, 'hello world')
290        t0 = time.monotonic()
291        self.loop.run_forever()
292        t1 = time.monotonic()
293        self.assertEqual(results, ['hello world'])
294        self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
295
296    def test_call_soon(self):
297        results = []
298
299        def callback(arg1, arg2):
300            results.append((arg1, arg2))
301            self.loop.stop()
302
303        self.loop.call_soon(callback, 'hello', 'world')
304        self.loop.run_forever()
305        self.assertEqual(results, [('hello', 'world')])
306
307    def test_call_soon_threadsafe(self):
308        results = []
309        lock = threading.Lock()
310
311        def callback(arg):
312            results.append(arg)
313            if len(results) >= 2:
314                self.loop.stop()
315
316        def run_in_thread():
317            self.loop.call_soon_threadsafe(callback, 'hello')
318            lock.release()
319
320        lock.acquire()
321        t = threading.Thread(target=run_in_thread)
322        t.start()
323
324        with lock:
325            self.loop.call_soon(callback, 'world')
326            self.loop.run_forever()
327        t.join()
328        self.assertEqual(results, ['hello', 'world'])
329
330    def test_call_soon_threadsafe_same_thread(self):
331        results = []
332
333        def callback(arg):
334            results.append(arg)
335            if len(results) >= 2:
336                self.loop.stop()
337
338        self.loop.call_soon_threadsafe(callback, 'hello')
339        self.loop.call_soon(callback, 'world')
340        self.loop.run_forever()
341        self.assertEqual(results, ['hello', 'world'])
342
343    def test_run_in_executor(self):
344        def run(arg):
345            return (arg, threading.get_ident())
346        f2 = self.loop.run_in_executor(None, run, 'yo')
347        res, thread_id = self.loop.run_until_complete(f2)
348        self.assertEqual(res, 'yo')
349        self.assertNotEqual(thread_id, threading.get_ident())
350
351    def test_run_in_executor_cancel(self):
352        called = False
353
354        def patched_call_soon(*args):
355            nonlocal called
356            called = True
357
358        def run():
359            time.sleep(0.05)
360
361        f2 = self.loop.run_in_executor(None, run)
362        f2.cancel()
363        self.loop.close()
364        self.loop.call_soon = patched_call_soon
365        self.loop.call_soon_threadsafe = patched_call_soon
366        time.sleep(0.4)
367        self.assertFalse(called)
368
369    def test_reader_callback(self):
370        r, w = socket.socketpair()
371        r.setblocking(False)
372        bytes_read = bytearray()
373
374        def reader():
375            try:
376                data = r.recv(1024)
377            except BlockingIOError:
378                # Spurious readiness notifications are possible
379                # at least on Linux -- see man select.
380                return
381            if data:
382                bytes_read.extend(data)
383            else:
384                self.assertTrue(self.loop.remove_reader(r.fileno()))
385                r.close()
386
387        self.loop.add_reader(r.fileno(), reader)
388        self.loop.call_soon(w.send, b'abc')
389        test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3)
390        self.loop.call_soon(w.send, b'def')
391        test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6)
392        self.loop.call_soon(w.close)
393        self.loop.call_soon(self.loop.stop)
394        self.loop.run_forever()
395        self.assertEqual(bytes_read, b'abcdef')
396
397    def test_writer_callback(self):
398        r, w = socket.socketpair()
399        w.setblocking(False)
400
401        def writer(data):
402            w.send(data)
403            self.loop.stop()
404
405        data = b'x' * 1024
406        self.loop.add_writer(w.fileno(), writer, data)
407        self.loop.run_forever()
408
409        self.assertTrue(self.loop.remove_writer(w.fileno()))
410        self.assertFalse(self.loop.remove_writer(w.fileno()))
411
412        w.close()
413        read = r.recv(len(data) * 2)
414        r.close()
415        self.assertEqual(read, data)
416
417    @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL')
418    def test_add_signal_handler(self):
419        caught = 0
420
421        def my_handler():
422            nonlocal caught
423            caught += 1
424
425        # Check error behavior first.
426        self.assertRaises(
427            TypeError, self.loop.add_signal_handler, 'boom', my_handler)
428        self.assertRaises(
429            TypeError, self.loop.remove_signal_handler, 'boom')
430        self.assertRaises(
431            ValueError, self.loop.add_signal_handler, signal.NSIG+1,
432            my_handler)
433        self.assertRaises(
434            ValueError, self.loop.remove_signal_handler, signal.NSIG+1)
435        self.assertRaises(
436            ValueError, self.loop.add_signal_handler, 0, my_handler)
437        self.assertRaises(
438            ValueError, self.loop.remove_signal_handler, 0)
439        self.assertRaises(
440            ValueError, self.loop.add_signal_handler, -1, my_handler)
441        self.assertRaises(
442            ValueError, self.loop.remove_signal_handler, -1)
443        self.assertRaises(
444            RuntimeError, self.loop.add_signal_handler, signal.SIGKILL,
445            my_handler)
446        # Removing SIGKILL doesn't raise, since we don't call signal().
447        self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL))
448        # Now set a handler and handle it.
449        self.loop.add_signal_handler(signal.SIGINT, my_handler)
450
451        os.kill(os.getpid(), signal.SIGINT)
452        test_utils.run_until(self.loop, lambda: caught)
453
454        # Removing it should restore the default handler.
455        self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT))
456        self.assertEqual(signal.getsignal(signal.SIGINT),
457                         signal.default_int_handler)
458        # Removing again returns False.
459        self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT))
460
461    @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
462    def test_signal_handling_while_selecting(self):
463        # Test with a signal actually arriving during a select() call.
464        caught = 0
465
466        def my_handler():
467            nonlocal caught
468            caught += 1
469            self.loop.stop()
470
471        self.loop.add_signal_handler(signal.SIGALRM, my_handler)
472
473        signal.setitimer(signal.ITIMER_REAL, 0.01, 0)  # Send SIGALRM once.
474        self.loop.call_later(60, self.loop.stop)
475        self.loop.run_forever()
476        self.assertEqual(caught, 1)
477
478    @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
479    def test_signal_handling_args(self):
480        some_args = (42,)
481        caught = 0
482
483        def my_handler(*args):
484            nonlocal caught
485            caught += 1
486            self.assertEqual(args, some_args)
487            self.loop.stop()
488
489        self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args)
490
491        signal.setitimer(signal.ITIMER_REAL, 0.1, 0)  # Send SIGALRM once.
492        self.loop.call_later(60, self.loop.stop)
493        self.loop.run_forever()
494        self.assertEqual(caught, 1)
495
496    def _basetest_create_connection(self, connection_fut, check_sockname=True):
497        tr, pr = self.loop.run_until_complete(connection_fut)
498        self.assertIsInstance(tr, asyncio.Transport)
499        self.assertIsInstance(pr, asyncio.Protocol)
500        self.assertIs(pr.transport, tr)
501        if check_sockname:
502            self.assertIsNotNone(tr.get_extra_info('sockname'))
503        self.loop.run_until_complete(pr.done)
504        self.assertGreater(pr.nbytes, 0)
505        tr.close()
506
507    def test_create_connection(self):
508        with test_utils.run_test_server() as httpd:
509            conn_fut = self.loop.create_connection(
510                lambda: MyProto(loop=self.loop), *httpd.address)
511            self._basetest_create_connection(conn_fut)
512
513    @support.skip_unless_bind_unix_socket
514    def test_create_unix_connection(self):
515        # Issue #20682: On Mac OS X Tiger, getsockname() returns a
516        # zero-length address for UNIX socket.
517        check_sockname = not broken_unix_getsockname()
518
519        with test_utils.run_test_unix_server() as httpd:
520            conn_fut = self.loop.create_unix_connection(
521                lambda: MyProto(loop=self.loop), httpd.address)
522            self._basetest_create_connection(conn_fut, check_sockname)
523
524    def check_ssl_extra_info(self, client, check_sockname=True,
525                             peername=None, peercert={}):
526        if check_sockname:
527            self.assertIsNotNone(client.get_extra_info('sockname'))
528        if peername:
529            self.assertEqual(peername,
530                             client.get_extra_info('peername'))
531        else:
532            self.assertIsNotNone(client.get_extra_info('peername'))
533        self.assertEqual(peercert,
534                         client.get_extra_info('peercert'))
535
536        # test SSL cipher
537        cipher = client.get_extra_info('cipher')
538        self.assertIsInstance(cipher, tuple)
539        self.assertEqual(len(cipher), 3, cipher)
540        self.assertIsInstance(cipher[0], str)
541        self.assertIsInstance(cipher[1], str)
542        self.assertIsInstance(cipher[2], int)
543
544        # test SSL object
545        sslobj = client.get_extra_info('ssl_object')
546        self.assertIsNotNone(sslobj)
547        self.assertEqual(sslobj.compression(),
548                         client.get_extra_info('compression'))
549        self.assertEqual(sslobj.cipher(),
550                         client.get_extra_info('cipher'))
551        self.assertEqual(sslobj.getpeercert(),
552                         client.get_extra_info('peercert'))
553        self.assertEqual(sslobj.compression(),
554                         client.get_extra_info('compression'))
555
556    def _basetest_create_ssl_connection(self, connection_fut,
557                                        check_sockname=True,
558                                        peername=None):
559        tr, pr = self.loop.run_until_complete(connection_fut)
560        self.assertIsInstance(tr, asyncio.Transport)
561        self.assertIsInstance(pr, asyncio.Protocol)
562        self.assertTrue('ssl' in tr.__class__.__name__.lower())
563        self.check_ssl_extra_info(tr, check_sockname, peername)
564        self.loop.run_until_complete(pr.done)
565        self.assertGreater(pr.nbytes, 0)
566        tr.close()
567
568    def _test_create_ssl_connection(self, httpd, create_connection,
569                                    check_sockname=True, peername=None):
570        conn_fut = create_connection(ssl=test_utils.dummy_ssl_context())
571        self._basetest_create_ssl_connection(conn_fut, check_sockname,
572                                             peername)
573
574        # ssl.Purpose was introduced in Python 3.4
575        if hasattr(ssl, 'Purpose'):
576            def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *,
577                                          cafile=None, capath=None,
578                                          cadata=None):
579                """
580                A ssl.create_default_context() replacement that doesn't enable
581                cert validation.
582                """
583                self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH)
584                return test_utils.dummy_ssl_context()
585
586            # With ssl=True, ssl.create_default_context() should be called
587            with mock.patch('ssl.create_default_context',
588                            side_effect=_dummy_ssl_create_context) as m:
589                conn_fut = create_connection(ssl=True)
590                self._basetest_create_ssl_connection(conn_fut, check_sockname,
591                                                     peername)
592                self.assertEqual(m.call_count, 1)
593
594        # With the real ssl.create_default_context(), certificate
595        # validation will fail
596        with self.assertRaises(ssl.SSLError) as cm:
597            conn_fut = create_connection(ssl=True)
598            # Ignore the "SSL handshake failed" log in debug mode
599            with test_utils.disable_logger():
600                self._basetest_create_ssl_connection(conn_fut, check_sockname,
601                                                     peername)
602
603        self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
604
605    @unittest.skipIf(ssl is None, 'No ssl module')
606    def test_create_ssl_connection(self):
607        with test_utils.run_test_server(use_ssl=True) as httpd:
608            create_connection = functools.partial(
609                self.loop.create_connection,
610                lambda: MyProto(loop=self.loop),
611                *httpd.address)
612            self._test_create_ssl_connection(httpd, create_connection,
613                                             peername=httpd.address)
614
615    @support.skip_unless_bind_unix_socket
616    @unittest.skipIf(ssl is None, 'No ssl module')
617    def test_create_ssl_unix_connection(self):
618        # Issue #20682: On Mac OS X Tiger, getsockname() returns a
619        # zero-length address for UNIX socket.
620        check_sockname = not broken_unix_getsockname()
621
622        with test_utils.run_test_unix_server(use_ssl=True) as httpd:
623            create_connection = functools.partial(
624                self.loop.create_unix_connection,
625                lambda: MyProto(loop=self.loop), httpd.address,
626                server_hostname='127.0.0.1')
627
628            self._test_create_ssl_connection(httpd, create_connection,
629                                             check_sockname,
630                                             peername=httpd.address)
631
632    def test_create_connection_local_addr(self):
633        with test_utils.run_test_server() as httpd:
634            port = support.find_unused_port()
635            f = self.loop.create_connection(
636                lambda: MyProto(loop=self.loop),
637                *httpd.address, local_addr=(httpd.address[0], port))
638            tr, pr = self.loop.run_until_complete(f)
639            expected = pr.transport.get_extra_info('sockname')[1]
640            self.assertEqual(port, expected)
641            tr.close()
642
643    def test_create_connection_local_addr_in_use(self):
644        with test_utils.run_test_server() as httpd:
645            f = self.loop.create_connection(
646                lambda: MyProto(loop=self.loop),
647                *httpd.address, local_addr=httpd.address)
648            with self.assertRaises(OSError) as cm:
649                self.loop.run_until_complete(f)
650            self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
651            self.assertIn(str(httpd.address), cm.exception.strerror)
652
653    def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None):
654        loop = self.loop
655
656        class MyProto(MyBaseProto):
657
658            def connection_lost(self, exc):
659                super().connection_lost(exc)
660                loop.call_soon(loop.stop)
661
662            def data_received(self, data):
663                super().data_received(data)
664                self.transport.write(expected_response)
665
666        lsock = socket.create_server(('127.0.0.1', 0), backlog=1)
667        addr = lsock.getsockname()
668
669        message = b'test data'
670        response = None
671        expected_response = b'roger'
672
673        def client():
674            nonlocal response
675            try:
676                csock = socket.socket()
677                if client_ssl is not None:
678                    csock = client_ssl.wrap_socket(csock)
679                csock.connect(addr)
680                csock.sendall(message)
681                response = csock.recv(99)
682                csock.close()
683            except Exception as exc:
684                print(
685                    "Failure in client thread in test_connect_accepted_socket",
686                    exc)
687
688        thread = threading.Thread(target=client, daemon=True)
689        thread.start()
690
691        conn, _ = lsock.accept()
692        proto = MyProto(loop=loop)
693        proto.loop = loop
694        loop.run_until_complete(
695            loop.connect_accepted_socket(
696                (lambda: proto), conn, ssl=server_ssl))
697        loop.run_forever()
698        proto.transport.close()
699        lsock.close()
700
701        support.join_thread(thread, timeout=1)
702        self.assertFalse(thread.is_alive())
703        self.assertEqual(proto.state, 'CLOSED')
704        self.assertEqual(proto.nbytes, len(message))
705        self.assertEqual(response, expected_response)
706
707    @unittest.skipIf(ssl is None, 'No ssl module')
708    def test_ssl_connect_accepted_socket(self):
709        if (sys.platform == 'win32' and
710            sys.version_info < (3, 5) and
711            isinstance(self.loop, proactor_events.BaseProactorEventLoop)
712            ):
713            raise unittest.SkipTest(
714                'SSL not supported with proactor event loops before Python 3.5'
715                )
716
717        server_context = test_utils.simple_server_sslcontext()
718        client_context = test_utils.simple_client_sslcontext()
719
720        self.test_connect_accepted_socket(server_context, client_context)
721
722    def test_connect_accepted_socket_ssl_timeout_for_plain_socket(self):
723        sock = socket.socket()
724        self.addCleanup(sock.close)
725        coro = self.loop.connect_accepted_socket(
726            MyProto, sock, ssl_handshake_timeout=1)
727        with self.assertRaisesRegex(
728                ValueError,
729                'ssl_handshake_timeout is only meaningful with ssl'):
730            self.loop.run_until_complete(coro)
731
732    @mock.patch('asyncio.base_events.socket')
733    def create_server_multiple_hosts(self, family, hosts, mock_sock):
734        async def getaddrinfo(host, port, *args, **kw):
735            if family == socket.AF_INET:
736                return [(family, socket.SOCK_STREAM, 6, '', (host, port))]
737            else:
738                return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))]
739
740        def getaddrinfo_task(*args, **kwds):
741            return self.loop.create_task(getaddrinfo(*args, **kwds))
742
743        unique_hosts = set(hosts)
744
745        if family == socket.AF_INET:
746            mock_sock.socket().getsockbyname.side_effect = [
747                (host, 80) for host in unique_hosts]
748        else:
749            mock_sock.socket().getsockbyname.side_effect = [
750                (host, 80, 0, 0) for host in unique_hosts]
751        self.loop.getaddrinfo = getaddrinfo_task
752        self.loop._start_serving = mock.Mock()
753        self.loop._stop_serving = mock.Mock()
754        f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80)
755        server = self.loop.run_until_complete(f)
756        self.addCleanup(server.close)
757        server_hosts = {sock.getsockbyname()[0] for sock in server.sockets}
758        self.assertEqual(server_hosts, unique_hosts)
759
760    def test_create_server_multiple_hosts_ipv4(self):
761        self.create_server_multiple_hosts(socket.AF_INET,
762                                          ['1.2.3.4', '5.6.7.8', '1.2.3.4'])
763
764    def test_create_server_multiple_hosts_ipv6(self):
765        self.create_server_multiple_hosts(socket.AF_INET6,
766                                          ['::1', '::2', '::1'])
767
768    def test_create_server(self):
769        proto = MyProto(self.loop)
770        f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
771        server = self.loop.run_until_complete(f)
772        self.assertEqual(len(server.sockets), 1)
773        sock = server.sockets[0]
774        host, port = sock.getsockname()
775        self.assertEqual(host, '0.0.0.0')
776        client = socket.socket()
777        client.connect(('127.0.0.1', port))
778        client.sendall(b'xxx')
779
780        self.loop.run_until_complete(proto.connected)
781        self.assertEqual('CONNECTED', proto.state)
782
783        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
784        self.assertEqual(3, proto.nbytes)
785
786        # extra info is available
787        self.assertIsNotNone(proto.transport.get_extra_info('sockname'))
788        self.assertEqual('127.0.0.1',
789                         proto.transport.get_extra_info('peername')[0])
790
791        # close connection
792        proto.transport.close()
793        self.loop.run_until_complete(proto.done)
794
795        self.assertEqual('CLOSED', proto.state)
796
797        # the client socket must be closed after to avoid ECONNRESET upon
798        # recv()/send() on the serving socket
799        client.close()
800
801        # close server
802        server.close()
803
804    @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT')
805    def test_create_server_reuse_port(self):
806        proto = MyProto(self.loop)
807        f = self.loop.create_server(
808            lambda: proto, '0.0.0.0', 0)
809        server = self.loop.run_until_complete(f)
810        self.assertEqual(len(server.sockets), 1)
811        sock = server.sockets[0]
812        self.assertFalse(
813            sock.getsockopt(
814                socket.SOL_SOCKET, socket.SO_REUSEPORT))
815        server.close()
816
817        test_utils.run_briefly(self.loop)
818
819        proto = MyProto(self.loop)
820        f = self.loop.create_server(
821            lambda: proto, '0.0.0.0', 0, reuse_port=True)
822        server = self.loop.run_until_complete(f)
823        self.assertEqual(len(server.sockets), 1)
824        sock = server.sockets[0]
825        self.assertTrue(
826            sock.getsockopt(
827                socket.SOL_SOCKET, socket.SO_REUSEPORT))
828        server.close()
829
830    def _make_unix_server(self, factory, **kwargs):
831        path = test_utils.gen_unix_socket_path()
832        self.addCleanup(lambda: os.path.exists(path) and os.unlink(path))
833
834        f = self.loop.create_unix_server(factory, path, **kwargs)
835        server = self.loop.run_until_complete(f)
836
837        return server, path
838
839    @support.skip_unless_bind_unix_socket
840    def test_create_unix_server(self):
841        proto = MyProto(loop=self.loop)
842        server, path = self._make_unix_server(lambda: proto)
843        self.assertEqual(len(server.sockets), 1)
844
845        client = socket.socket(socket.AF_UNIX)
846        client.connect(path)
847        client.sendall(b'xxx')
848
849        self.loop.run_until_complete(proto.connected)
850        self.assertEqual('CONNECTED', proto.state)
851        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
852        self.assertEqual(3, proto.nbytes)
853
854        # close connection
855        proto.transport.close()
856        self.loop.run_until_complete(proto.done)
857
858        self.assertEqual('CLOSED', proto.state)
859
860        # the client socket must be closed after to avoid ECONNRESET upon
861        # recv()/send() on the serving socket
862        client.close()
863
864        # close server
865        server.close()
866
867    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
868    def test_create_unix_server_path_socket_error(self):
869        proto = MyProto(loop=self.loop)
870        sock = socket.socket()
871        with sock:
872            f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock)
873            with self.assertRaisesRegex(ValueError,
874                                        'path and sock can not be specified '
875                                        'at the same time'):
876                self.loop.run_until_complete(f)
877
878    def _create_ssl_context(self, certfile, keyfile=None):
879        sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
880        sslcontext.options |= ssl.OP_NO_SSLv2
881        sslcontext.load_cert_chain(certfile, keyfile)
882        return sslcontext
883
884    def _make_ssl_server(self, factory, certfile, keyfile=None):
885        sslcontext = self._create_ssl_context(certfile, keyfile)
886
887        f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext)
888        server = self.loop.run_until_complete(f)
889
890        sock = server.sockets[0]
891        host, port = sock.getsockname()
892        self.assertEqual(host, '127.0.0.1')
893        return server, host, port
894
895    def _make_ssl_unix_server(self, factory, certfile, keyfile=None):
896        sslcontext = self._create_ssl_context(certfile, keyfile)
897        return self._make_unix_server(factory, ssl=sslcontext)
898
899    @unittest.skipIf(ssl is None, 'No ssl module')
900    def test_create_server_ssl(self):
901        proto = MyProto(loop=self.loop)
902        server, host, port = self._make_ssl_server(
903            lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY)
904
905        f_c = self.loop.create_connection(MyBaseProto, host, port,
906                                          ssl=test_utils.dummy_ssl_context())
907        client, pr = self.loop.run_until_complete(f_c)
908
909        client.write(b'xxx')
910        self.loop.run_until_complete(proto.connected)
911        self.assertEqual('CONNECTED', proto.state)
912
913        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
914        self.assertEqual(3, proto.nbytes)
915
916        # extra info is available
917        self.check_ssl_extra_info(client, peername=(host, port))
918
919        # close connection
920        proto.transport.close()
921        self.loop.run_until_complete(proto.done)
922        self.assertEqual('CLOSED', proto.state)
923
924        # the client socket must be closed after to avoid ECONNRESET upon
925        # recv()/send() on the serving socket
926        client.close()
927
928        # stop serving
929        server.close()
930
931    @support.skip_unless_bind_unix_socket
932    @unittest.skipIf(ssl is None, 'No ssl module')
933    def test_create_unix_server_ssl(self):
934        proto = MyProto(loop=self.loop)
935        server, path = self._make_ssl_unix_server(
936            lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY)
937
938        f_c = self.loop.create_unix_connection(
939            MyBaseProto, path, ssl=test_utils.dummy_ssl_context(),
940            server_hostname='')
941
942        client, pr = self.loop.run_until_complete(f_c)
943
944        client.write(b'xxx')
945        self.loop.run_until_complete(proto.connected)
946        self.assertEqual('CONNECTED', proto.state)
947        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
948        self.assertEqual(3, proto.nbytes)
949
950        # close connection
951        proto.transport.close()
952        self.loop.run_until_complete(proto.done)
953        self.assertEqual('CLOSED', proto.state)
954
955        # the client socket must be closed after to avoid ECONNRESET upon
956        # recv()/send() on the serving socket
957        client.close()
958
959        # stop serving
960        server.close()
961
962    @unittest.skipIf(ssl is None, 'No ssl module')
963    def test_create_server_ssl_verify_failed(self):
964        proto = MyProto(loop=self.loop)
965        server, host, port = self._make_ssl_server(
966            lambda: proto, test_utils.SIGNED_CERTFILE)
967
968        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
969        sslcontext_client.options |= ssl.OP_NO_SSLv2
970        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
971        if hasattr(sslcontext_client, 'check_hostname'):
972            sslcontext_client.check_hostname = True
973
974
975        # no CA loaded
976        f_c = self.loop.create_connection(MyProto, host, port,
977                                          ssl=sslcontext_client)
978        with mock.patch.object(self.loop, 'call_exception_handler'):
979            with test_utils.disable_logger():
980                with self.assertRaisesRegex(ssl.SSLError,
981                                            '(?i)certificate.verify.failed'):
982                    self.loop.run_until_complete(f_c)
983
984            # execute the loop to log the connection error
985            test_utils.run_briefly(self.loop)
986
987        # close connection
988        self.assertIsNone(proto.transport)
989        server.close()
990
991    @support.skip_unless_bind_unix_socket
992    @unittest.skipIf(ssl is None, 'No ssl module')
993    def test_create_unix_server_ssl_verify_failed(self):
994        proto = MyProto(loop=self.loop)
995        server, path = self._make_ssl_unix_server(
996            lambda: proto, test_utils.SIGNED_CERTFILE)
997
998        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
999        sslcontext_client.options |= ssl.OP_NO_SSLv2
1000        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1001        if hasattr(sslcontext_client, 'check_hostname'):
1002            sslcontext_client.check_hostname = True
1003
1004        # no CA loaded
1005        f_c = self.loop.create_unix_connection(MyProto, path,
1006                                               ssl=sslcontext_client,
1007                                               server_hostname='invalid')
1008        with mock.patch.object(self.loop, 'call_exception_handler'):
1009            with test_utils.disable_logger():
1010                with self.assertRaisesRegex(ssl.SSLError,
1011                                            '(?i)certificate.verify.failed'):
1012                    self.loop.run_until_complete(f_c)
1013
1014            # execute the loop to log the connection error
1015            test_utils.run_briefly(self.loop)
1016
1017        # close connection
1018        self.assertIsNone(proto.transport)
1019        server.close()
1020
1021    @unittest.skipIf(ssl is None, 'No ssl module')
1022    def test_create_server_ssl_match_failed(self):
1023        proto = MyProto(loop=self.loop)
1024        server, host, port = self._make_ssl_server(
1025            lambda: proto, test_utils.SIGNED_CERTFILE)
1026
1027        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1028        sslcontext_client.options |= ssl.OP_NO_SSLv2
1029        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1030        sslcontext_client.load_verify_locations(
1031            cafile=test_utils.SIGNING_CA)
1032        if hasattr(sslcontext_client, 'check_hostname'):
1033            sslcontext_client.check_hostname = True
1034
1035        # incorrect server_hostname
1036        f_c = self.loop.create_connection(MyProto, host, port,
1037                                          ssl=sslcontext_client)
1038        with mock.patch.object(self.loop, 'call_exception_handler'):
1039            with test_utils.disable_logger():
1040                with self.assertRaisesRegex(
1041                        ssl.CertificateError,
1042                        "IP address mismatch, certificate is not valid for "
1043                        "'127.0.0.1'"):
1044                    self.loop.run_until_complete(f_c)
1045
1046        # close connection
1047        # transport is None because TLS ALERT aborted the handshake
1048        self.assertIsNone(proto.transport)
1049        server.close()
1050
1051    @support.skip_unless_bind_unix_socket
1052    @unittest.skipIf(ssl is None, 'No ssl module')
1053    def test_create_unix_server_ssl_verified(self):
1054        proto = MyProto(loop=self.loop)
1055        server, path = self._make_ssl_unix_server(
1056            lambda: proto, test_utils.SIGNED_CERTFILE)
1057
1058        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1059        sslcontext_client.options |= ssl.OP_NO_SSLv2
1060        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1061        sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA)
1062        if hasattr(sslcontext_client, 'check_hostname'):
1063            sslcontext_client.check_hostname = True
1064
1065        # Connection succeeds with correct CA and server hostname.
1066        f_c = self.loop.create_unix_connection(MyProto, path,
1067                                               ssl=sslcontext_client,
1068                                               server_hostname='localhost')
1069        client, pr = self.loop.run_until_complete(f_c)
1070
1071        # close connection
1072        proto.transport.close()
1073        client.close()
1074        server.close()
1075        self.loop.run_until_complete(proto.done)
1076
1077    @unittest.skipIf(ssl is None, 'No ssl module')
1078    def test_create_server_ssl_verified(self):
1079        proto = MyProto(loop=self.loop)
1080        server, host, port = self._make_ssl_server(
1081            lambda: proto, test_utils.SIGNED_CERTFILE)
1082
1083        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1084        sslcontext_client.options |= ssl.OP_NO_SSLv2
1085        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1086        sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA)
1087        if hasattr(sslcontext_client, 'check_hostname'):
1088            sslcontext_client.check_hostname = True
1089
1090        # Connection succeeds with correct CA and server hostname.
1091        f_c = self.loop.create_connection(MyProto, host, port,
1092                                          ssl=sslcontext_client,
1093                                          server_hostname='localhost')
1094        client, pr = self.loop.run_until_complete(f_c)
1095
1096        # extra info is available
1097        self.check_ssl_extra_info(client, peername=(host, port),
1098                                  peercert=test_utils.PEERCERT)
1099
1100        # close connection
1101        proto.transport.close()
1102        client.close()
1103        server.close()
1104        self.loop.run_until_complete(proto.done)
1105
1106    def test_create_server_sock(self):
1107        proto = self.loop.create_future()
1108
1109        class TestMyProto(MyProto):
1110            def connection_made(self, transport):
1111                super().connection_made(transport)
1112                proto.set_result(self)
1113
1114        sock_ob = socket.create_server(('0.0.0.0', 0))
1115
1116        f = self.loop.create_server(TestMyProto, sock=sock_ob)
1117        server = self.loop.run_until_complete(f)
1118        sock = server.sockets[0]
1119        self.assertEqual(sock.fileno(), sock_ob.fileno())
1120
1121        host, port = sock.getsockname()
1122        self.assertEqual(host, '0.0.0.0')
1123        client = socket.socket()
1124        client.connect(('127.0.0.1', port))
1125        client.send(b'xxx')
1126        client.close()
1127        server.close()
1128
1129    def test_create_server_addr_in_use(self):
1130        sock_ob = socket.create_server(('0.0.0.0', 0))
1131
1132        f = self.loop.create_server(MyProto, sock=sock_ob)
1133        server = self.loop.run_until_complete(f)
1134        sock = server.sockets[0]
1135        host, port = sock.getsockname()
1136
1137        f = self.loop.create_server(MyProto, host=host, port=port)
1138        with self.assertRaises(OSError) as cm:
1139            self.loop.run_until_complete(f)
1140        self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
1141
1142        server.close()
1143
1144    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
1145    def test_create_server_dual_stack(self):
1146        f_proto = self.loop.create_future()
1147
1148        class TestMyProto(MyProto):
1149            def connection_made(self, transport):
1150                super().connection_made(transport)
1151                f_proto.set_result(self)
1152
1153        try_count = 0
1154        while True:
1155            try:
1156                port = support.find_unused_port()
1157                f = self.loop.create_server(TestMyProto, host=None, port=port)
1158                server = self.loop.run_until_complete(f)
1159            except OSError as ex:
1160                if ex.errno == errno.EADDRINUSE:
1161                    try_count += 1
1162                    self.assertGreaterEqual(5, try_count)
1163                    continue
1164                else:
1165                    raise
1166            else:
1167                break
1168        client = socket.socket()
1169        client.connect(('127.0.0.1', port))
1170        client.send(b'xxx')
1171        proto = self.loop.run_until_complete(f_proto)
1172        proto.transport.close()
1173        client.close()
1174
1175        f_proto = self.loop.create_future()
1176        client = socket.socket(socket.AF_INET6)
1177        client.connect(('::1', port))
1178        client.send(b'xxx')
1179        proto = self.loop.run_until_complete(f_proto)
1180        proto.transport.close()
1181        client.close()
1182
1183        server.close()
1184
1185    def test_server_close(self):
1186        f = self.loop.create_server(MyProto, '0.0.0.0', 0)
1187        server = self.loop.run_until_complete(f)
1188        sock = server.sockets[0]
1189        host, port = sock.getsockname()
1190
1191        client = socket.socket()
1192        client.connect(('127.0.0.1', port))
1193        client.send(b'xxx')
1194        client.close()
1195
1196        server.close()
1197
1198        client = socket.socket()
1199        self.assertRaises(
1200            ConnectionRefusedError, client.connect, ('127.0.0.1', port))
1201        client.close()
1202
1203    def test_create_datagram_endpoint(self):
1204        class TestMyDatagramProto(MyDatagramProto):
1205            def __init__(inner_self):
1206                super().__init__(loop=self.loop)
1207
1208            def datagram_received(self, data, addr):
1209                super().datagram_received(data, addr)
1210                self.transport.sendto(b'resp:'+data, addr)
1211
1212        coro = self.loop.create_datagram_endpoint(
1213            TestMyDatagramProto, local_addr=('127.0.0.1', 0))
1214        s_transport, server = self.loop.run_until_complete(coro)
1215        host, port = s_transport.get_extra_info('sockname')
1216
1217        self.assertIsInstance(s_transport, asyncio.Transport)
1218        self.assertIsInstance(server, TestMyDatagramProto)
1219        self.assertEqual('INITIALIZED', server.state)
1220        self.assertIs(server.transport, s_transport)
1221
1222        coro = self.loop.create_datagram_endpoint(
1223            lambda: MyDatagramProto(loop=self.loop),
1224            remote_addr=(host, port))
1225        transport, client = self.loop.run_until_complete(coro)
1226
1227        self.assertIsInstance(transport, asyncio.Transport)
1228        self.assertIsInstance(client, MyDatagramProto)
1229        self.assertEqual('INITIALIZED', client.state)
1230        self.assertIs(client.transport, transport)
1231
1232        transport.sendto(b'xxx')
1233        test_utils.run_until(self.loop, lambda: server.nbytes)
1234        self.assertEqual(3, server.nbytes)
1235        test_utils.run_until(self.loop, lambda: client.nbytes)
1236
1237        # received
1238        self.assertEqual(8, client.nbytes)
1239
1240        # extra info is available
1241        self.assertIsNotNone(transport.get_extra_info('sockname'))
1242
1243        # close connection
1244        transport.close()
1245        self.loop.run_until_complete(client.done)
1246        self.assertEqual('CLOSED', client.state)
1247        server.transport.close()
1248
1249    def test_create_datagram_endpoint_sock(self):
1250        sock = None
1251        local_address = ('127.0.0.1', 0)
1252        infos = self.loop.run_until_complete(
1253            self.loop.getaddrinfo(
1254                *local_address, type=socket.SOCK_DGRAM))
1255        for family, type, proto, cname, address in infos:
1256            try:
1257                sock = socket.socket(family=family, type=type, proto=proto)
1258                sock.setblocking(False)
1259                sock.bind(address)
1260            except:
1261                pass
1262            else:
1263                break
1264        else:
1265            assert False, 'Can not create socket.'
1266
1267        f = self.loop.create_datagram_endpoint(
1268            lambda: MyDatagramProto(loop=self.loop), sock=sock)
1269        tr, pr = self.loop.run_until_complete(f)
1270        self.assertIsInstance(tr, asyncio.Transport)
1271        self.assertIsInstance(pr, MyDatagramProto)
1272        tr.close()
1273        self.loop.run_until_complete(pr.done)
1274
1275    def test_internal_fds(self):
1276        loop = self.create_event_loop()
1277        if not isinstance(loop, selector_events.BaseSelectorEventLoop):
1278            loop.close()
1279            self.skipTest('loop is not a BaseSelectorEventLoop')
1280
1281        self.assertEqual(1, loop._internal_fds)
1282        loop.close()
1283        self.assertEqual(0, loop._internal_fds)
1284        self.assertIsNone(loop._csock)
1285        self.assertIsNone(loop._ssock)
1286
1287    @unittest.skipUnless(sys.platform != 'win32',
1288                         "Don't support pipes for Windows")
1289    def test_read_pipe(self):
1290        proto = MyReadPipeProto(loop=self.loop)
1291
1292        rpipe, wpipe = os.pipe()
1293        pipeobj = io.open(rpipe, 'rb', 1024)
1294
1295        async def connect():
1296            t, p = await self.loop.connect_read_pipe(
1297                lambda: proto, pipeobj)
1298            self.assertIs(p, proto)
1299            self.assertIs(t, proto.transport)
1300            self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1301            self.assertEqual(0, proto.nbytes)
1302
1303        self.loop.run_until_complete(connect())
1304
1305        os.write(wpipe, b'1')
1306        test_utils.run_until(self.loop, lambda: proto.nbytes >= 1)
1307        self.assertEqual(1, proto.nbytes)
1308
1309        os.write(wpipe, b'2345')
1310        test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
1311        self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1312        self.assertEqual(5, proto.nbytes)
1313
1314        os.close(wpipe)
1315        self.loop.run_until_complete(proto.done)
1316        self.assertEqual(
1317            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
1318        # extra info is available
1319        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1320
1321    @unittest.skipUnless(sys.platform != 'win32',
1322                         "Don't support pipes for Windows")
1323    def test_unclosed_pipe_transport(self):
1324        # This test reproduces the issue #314 on GitHub
1325        loop = self.create_event_loop()
1326        read_proto = MyReadPipeProto(loop=loop)
1327        write_proto = MyWritePipeProto(loop=loop)
1328
1329        rpipe, wpipe = os.pipe()
1330        rpipeobj = io.open(rpipe, 'rb', 1024)
1331        wpipeobj = io.open(wpipe, 'w', 1024)
1332
1333        async def connect():
1334            read_transport, _ = await loop.connect_read_pipe(
1335                lambda: read_proto, rpipeobj)
1336            write_transport, _ = await loop.connect_write_pipe(
1337                lambda: write_proto, wpipeobj)
1338            return read_transport, write_transport
1339
1340        # Run and close the loop without closing the transports
1341        read_transport, write_transport = loop.run_until_complete(connect())
1342        loop.close()
1343
1344        # These 'repr' calls used to raise an AttributeError
1345        # See Issue #314 on GitHub
1346        self.assertIn('open', repr(read_transport))
1347        self.assertIn('open', repr(write_transport))
1348
1349        # Clean up (avoid ResourceWarning)
1350        rpipeobj.close()
1351        wpipeobj.close()
1352        read_transport._pipe = None
1353        write_transport._pipe = None
1354
1355    @unittest.skipUnless(sys.platform != 'win32',
1356                         "Don't support pipes for Windows")
1357    def test_read_pty_output(self):
1358        proto = MyReadPipeProto(loop=self.loop)
1359
1360        master, slave = os.openpty()
1361        master_read_obj = io.open(master, 'rb', 0)
1362
1363        async def connect():
1364            t, p = await self.loop.connect_read_pipe(lambda: proto,
1365                                                     master_read_obj)
1366            self.assertIs(p, proto)
1367            self.assertIs(t, proto.transport)
1368            self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1369            self.assertEqual(0, proto.nbytes)
1370
1371        self.loop.run_until_complete(connect())
1372
1373        os.write(slave, b'1')
1374        test_utils.run_until(self.loop, lambda: proto.nbytes)
1375        self.assertEqual(1, proto.nbytes)
1376
1377        os.write(slave, b'2345')
1378        test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
1379        self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1380        self.assertEqual(5, proto.nbytes)
1381
1382        os.close(slave)
1383        proto.transport.close()
1384        self.loop.run_until_complete(proto.done)
1385        self.assertEqual(
1386            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
1387        # extra info is available
1388        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1389
1390    @unittest.skipUnless(sys.platform != 'win32',
1391                         "Don't support pipes for Windows")
1392    def test_write_pipe(self):
1393        rpipe, wpipe = os.pipe()
1394        pipeobj = io.open(wpipe, 'wb', 1024)
1395
1396        proto = MyWritePipeProto(loop=self.loop)
1397        connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
1398        transport, p = self.loop.run_until_complete(connect)
1399        self.assertIs(p, proto)
1400        self.assertIs(transport, proto.transport)
1401        self.assertEqual('CONNECTED', proto.state)
1402
1403        transport.write(b'1')
1404
1405        data = bytearray()
1406        def reader(data):
1407            chunk = os.read(rpipe, 1024)
1408            data += chunk
1409            return len(data)
1410
1411        test_utils.run_until(self.loop, lambda: reader(data) >= 1)
1412        self.assertEqual(b'1', data)
1413
1414        transport.write(b'2345')
1415        test_utils.run_until(self.loop, lambda: reader(data) >= 5)
1416        self.assertEqual(b'12345', data)
1417        self.assertEqual('CONNECTED', proto.state)
1418
1419        os.close(rpipe)
1420
1421        # extra info is available
1422        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1423
1424        # close connection
1425        proto.transport.close()
1426        self.loop.run_until_complete(proto.done)
1427        self.assertEqual('CLOSED', proto.state)
1428
1429    @unittest.skipUnless(sys.platform != 'win32',
1430                         "Don't support pipes for Windows")
1431    def test_write_pipe_disconnect_on_close(self):
1432        rsock, wsock = socket.socketpair()
1433        rsock.setblocking(False)
1434        pipeobj = io.open(wsock.detach(), 'wb', 1024)
1435
1436        proto = MyWritePipeProto(loop=self.loop)
1437        connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
1438        transport, p = self.loop.run_until_complete(connect)
1439        self.assertIs(p, proto)
1440        self.assertIs(transport, proto.transport)
1441        self.assertEqual('CONNECTED', proto.state)
1442
1443        transport.write(b'1')
1444        data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024))
1445        self.assertEqual(b'1', data)
1446
1447        rsock.close()
1448
1449        self.loop.run_until_complete(proto.done)
1450        self.assertEqual('CLOSED', proto.state)
1451
1452    @unittest.skipUnless(sys.platform != 'win32',
1453                         "Don't support pipes for Windows")
1454    # select, poll and kqueue don't support character devices (PTY) on Mac OS X
1455    # older than 10.6 (Snow Leopard)
1456    @support.requires_mac_ver(10, 6)
1457    def test_write_pty(self):
1458        master, slave = os.openpty()
1459        slave_write_obj = io.open(slave, 'wb', 0)
1460
1461        proto = MyWritePipeProto(loop=self.loop)
1462        connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj)
1463        transport, p = self.loop.run_until_complete(connect)
1464        self.assertIs(p, proto)
1465        self.assertIs(transport, proto.transport)
1466        self.assertEqual('CONNECTED', proto.state)
1467
1468        transport.write(b'1')
1469
1470        data = bytearray()
1471        def reader(data):
1472            chunk = os.read(master, 1024)
1473            data += chunk
1474            return len(data)
1475
1476        test_utils.run_until(self.loop, lambda: reader(data) >= 1,
1477                             timeout=10)
1478        self.assertEqual(b'1', data)
1479
1480        transport.write(b'2345')
1481        test_utils.run_until(self.loop, lambda: reader(data) >= 5,
1482                             timeout=10)
1483        self.assertEqual(b'12345', data)
1484        self.assertEqual('CONNECTED', proto.state)
1485
1486        os.close(master)
1487
1488        # extra info is available
1489        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1490
1491        # close connection
1492        proto.transport.close()
1493        self.loop.run_until_complete(proto.done)
1494        self.assertEqual('CLOSED', proto.state)
1495
1496    @unittest.skipUnless(sys.platform != 'win32',
1497                         "Don't support pipes for Windows")
1498    # select, poll and kqueue don't support character devices (PTY) on Mac OS X
1499    # older than 10.6 (Snow Leopard)
1500    @support.requires_mac_ver(10, 6)
1501    def test_bidirectional_pty(self):
1502        master, read_slave = os.openpty()
1503        write_slave = os.dup(read_slave)
1504        tty.setraw(read_slave)
1505
1506        slave_read_obj = io.open(read_slave, 'rb', 0)
1507        read_proto = MyReadPipeProto(loop=self.loop)
1508        read_connect = self.loop.connect_read_pipe(lambda: read_proto,
1509                                                   slave_read_obj)
1510        read_transport, p = self.loop.run_until_complete(read_connect)
1511        self.assertIs(p, read_proto)
1512        self.assertIs(read_transport, read_proto.transport)
1513        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1514        self.assertEqual(0, read_proto.nbytes)
1515
1516
1517        slave_write_obj = io.open(write_slave, 'wb', 0)
1518        write_proto = MyWritePipeProto(loop=self.loop)
1519        write_connect = self.loop.connect_write_pipe(lambda: write_proto,
1520                                                     slave_write_obj)
1521        write_transport, p = self.loop.run_until_complete(write_connect)
1522        self.assertIs(p, write_proto)
1523        self.assertIs(write_transport, write_proto.transport)
1524        self.assertEqual('CONNECTED', write_proto.state)
1525
1526        data = bytearray()
1527        def reader(data):
1528            chunk = os.read(master, 1024)
1529            data += chunk
1530            return len(data)
1531
1532        write_transport.write(b'1')
1533        test_utils.run_until(self.loop, lambda: reader(data) >= 1, timeout=10)
1534        self.assertEqual(b'1', data)
1535        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1536        self.assertEqual('CONNECTED', write_proto.state)
1537
1538        os.write(master, b'a')
1539        test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1,
1540                             timeout=10)
1541        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1542        self.assertEqual(1, read_proto.nbytes)
1543        self.assertEqual('CONNECTED', write_proto.state)
1544
1545        write_transport.write(b'2345')
1546        test_utils.run_until(self.loop, lambda: reader(data) >= 5, timeout=10)
1547        self.assertEqual(b'12345', data)
1548        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1549        self.assertEqual('CONNECTED', write_proto.state)
1550
1551        os.write(master, b'bcde')
1552        test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5,
1553                             timeout=10)
1554        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1555        self.assertEqual(5, read_proto.nbytes)
1556        self.assertEqual('CONNECTED', write_proto.state)
1557
1558        os.close(master)
1559
1560        read_transport.close()
1561        self.loop.run_until_complete(read_proto.done)
1562        self.assertEqual(
1563            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state)
1564
1565        write_transport.close()
1566        self.loop.run_until_complete(write_proto.done)
1567        self.assertEqual('CLOSED', write_proto.state)
1568
1569    def test_prompt_cancellation(self):
1570        r, w = socket.socketpair()
1571        r.setblocking(False)
1572        f = self.loop.create_task(self.loop.sock_recv(r, 1))
1573        ov = getattr(f, 'ov', None)
1574        if ov is not None:
1575            self.assertTrue(ov.pending)
1576
1577        async def main():
1578            try:
1579                self.loop.call_soon(f.cancel)
1580                await f
1581            except asyncio.CancelledError:
1582                res = 'cancelled'
1583            else:
1584                res = None
1585            finally:
1586                self.loop.stop()
1587            return res
1588
1589        start = time.monotonic()
1590        t = self.loop.create_task(main())
1591        self.loop.run_forever()
1592        elapsed = time.monotonic() - start
1593
1594        self.assertLess(elapsed, 0.1)
1595        self.assertEqual(t.result(), 'cancelled')
1596        self.assertRaises(asyncio.CancelledError, f.result)
1597        if ov is not None:
1598            self.assertFalse(ov.pending)
1599        self.loop._stop_serving(r)
1600
1601        r.close()
1602        w.close()
1603
1604    def test_timeout_rounding(self):
1605        def _run_once():
1606            self.loop._run_once_counter += 1
1607            orig_run_once()
1608
1609        orig_run_once = self.loop._run_once
1610        self.loop._run_once_counter = 0
1611        self.loop._run_once = _run_once
1612
1613        async def wait():
1614            loop = self.loop
1615            await asyncio.sleep(1e-2)
1616            await asyncio.sleep(1e-4)
1617            await asyncio.sleep(1e-6)
1618            await asyncio.sleep(1e-8)
1619            await asyncio.sleep(1e-10)
1620
1621        self.loop.run_until_complete(wait())
1622        # The ideal number of call is 12, but on some platforms, the selector
1623        # may sleep at little bit less than timeout depending on the resolution
1624        # of the clock used by the kernel. Tolerate a few useless calls on
1625        # these platforms.
1626        self.assertLessEqual(self.loop._run_once_counter, 20,
1627            {'clock_resolution': self.loop._clock_resolution,
1628             'selector': self.loop._selector.__class__.__name__})
1629
1630    def test_remove_fds_after_closing(self):
1631        loop = self.create_event_loop()
1632        callback = lambda: None
1633        r, w = socket.socketpair()
1634        self.addCleanup(r.close)
1635        self.addCleanup(w.close)
1636        loop.add_reader(r, callback)
1637        loop.add_writer(w, callback)
1638        loop.close()
1639        self.assertFalse(loop.remove_reader(r))
1640        self.assertFalse(loop.remove_writer(w))
1641
1642    def test_add_fds_after_closing(self):
1643        loop = self.create_event_loop()
1644        callback = lambda: None
1645        r, w = socket.socketpair()
1646        self.addCleanup(r.close)
1647        self.addCleanup(w.close)
1648        loop.close()
1649        with self.assertRaises(RuntimeError):
1650            loop.add_reader(r, callback)
1651        with self.assertRaises(RuntimeError):
1652            loop.add_writer(w, callback)
1653
1654    def test_close_running_event_loop(self):
1655        async def close_loop(loop):
1656            self.loop.close()
1657
1658        coro = close_loop(self.loop)
1659        with self.assertRaises(RuntimeError):
1660            self.loop.run_until_complete(coro)
1661
1662    def test_close(self):
1663        self.loop.close()
1664
1665        async def test():
1666            pass
1667
1668        func = lambda: False
1669        coro = test()
1670        self.addCleanup(coro.close)
1671
1672        # operation blocked when the loop is closed
1673        with self.assertRaises(RuntimeError):
1674            self.loop.run_forever()
1675        with self.assertRaises(RuntimeError):
1676            fut = self.loop.create_future()
1677            self.loop.run_until_complete(fut)
1678        with self.assertRaises(RuntimeError):
1679            self.loop.call_soon(func)
1680        with self.assertRaises(RuntimeError):
1681            self.loop.call_soon_threadsafe(func)
1682        with self.assertRaises(RuntimeError):
1683            self.loop.call_later(1.0, func)
1684        with self.assertRaises(RuntimeError):
1685            self.loop.call_at(self.loop.time() + .0, func)
1686        with self.assertRaises(RuntimeError):
1687            self.loop.create_task(coro)
1688        with self.assertRaises(RuntimeError):
1689            self.loop.add_signal_handler(signal.SIGTERM, func)
1690
1691        # run_in_executor test is tricky: the method is a coroutine,
1692        # but run_until_complete cannot be called on closed loop.
1693        # Thus iterate once explicitly.
1694        with self.assertRaises(RuntimeError):
1695            it = self.loop.run_in_executor(None, func).__await__()
1696            next(it)
1697
1698
1699class SubprocessTestsMixin:
1700
1701    def check_terminated(self, returncode):
1702        if sys.platform == 'win32':
1703            self.assertIsInstance(returncode, int)
1704            # expect 1 but sometimes get 0
1705        else:
1706            self.assertEqual(-signal.SIGTERM, returncode)
1707
1708    def check_killed(self, returncode):
1709        if sys.platform == 'win32':
1710            self.assertIsInstance(returncode, int)
1711            # expect 1 but sometimes get 0
1712        else:
1713            self.assertEqual(-signal.SIGKILL, returncode)
1714
1715    def test_subprocess_exec(self):
1716        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1717
1718        connect = self.loop.subprocess_exec(
1719                        functools.partial(MySubprocessProtocol, self.loop),
1720                        sys.executable, prog)
1721        with self.assertWarns(DeprecationWarning):
1722            transp, proto = self.loop.run_until_complete(connect)
1723            self.assertIsInstance(proto, MySubprocessProtocol)
1724            self.loop.run_until_complete(proto.connected)
1725            self.assertEqual('CONNECTED', proto.state)
1726
1727            stdin = transp.get_pipe_transport(0)
1728            stdin.write(b'Python The Winner')
1729            self.loop.run_until_complete(proto.got_data[1].wait())
1730            with test_utils.disable_logger():
1731                transp.close()
1732            self.loop.run_until_complete(proto.completed)
1733            self.check_killed(proto.returncode)
1734            self.assertEqual(b'Python The Winner', proto.data[1])
1735
1736    def test_subprocess_interactive(self):
1737        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1738
1739        connect = self.loop.subprocess_exec(
1740                        functools.partial(MySubprocessProtocol, self.loop),
1741                        sys.executable, prog)
1742
1743        with self.assertWarns(DeprecationWarning):
1744            transp, proto = self.loop.run_until_complete(connect)
1745            self.assertIsInstance(proto, MySubprocessProtocol)
1746            self.loop.run_until_complete(proto.connected)
1747            self.assertEqual('CONNECTED', proto.state)
1748
1749            stdin = transp.get_pipe_transport(0)
1750            stdin.write(b'Python ')
1751            self.loop.run_until_complete(proto.got_data[1].wait())
1752            proto.got_data[1].clear()
1753            self.assertEqual(b'Python ', proto.data[1])
1754
1755            stdin.write(b'The Winner')
1756            self.loop.run_until_complete(proto.got_data[1].wait())
1757            self.assertEqual(b'Python The Winner', proto.data[1])
1758
1759            with test_utils.disable_logger():
1760                transp.close()
1761            self.loop.run_until_complete(proto.completed)
1762            self.check_killed(proto.returncode)
1763
1764    def test_subprocess_shell(self):
1765        with self.assertWarns(DeprecationWarning):
1766            connect = self.loop.subprocess_shell(
1767                            functools.partial(MySubprocessProtocol, self.loop),
1768                            'echo Python')
1769            transp, proto = self.loop.run_until_complete(connect)
1770            self.assertIsInstance(proto, MySubprocessProtocol)
1771            self.loop.run_until_complete(proto.connected)
1772
1773            transp.get_pipe_transport(0).close()
1774            self.loop.run_until_complete(proto.completed)
1775            self.assertEqual(0, proto.returncode)
1776            self.assertTrue(all(f.done() for f in proto.disconnects.values()))
1777            self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python')
1778            self.assertEqual(proto.data[2], b'')
1779            transp.close()
1780
1781    def test_subprocess_exitcode(self):
1782        connect = self.loop.subprocess_shell(
1783                        functools.partial(MySubprocessProtocol, self.loop),
1784                        'exit 7', stdin=None, stdout=None, stderr=None)
1785
1786        with self.assertWarns(DeprecationWarning):
1787            transp, proto = self.loop.run_until_complete(connect)
1788        self.assertIsInstance(proto, MySubprocessProtocol)
1789        self.loop.run_until_complete(proto.completed)
1790        self.assertEqual(7, proto.returncode)
1791        transp.close()
1792
1793    def test_subprocess_close_after_finish(self):
1794        connect = self.loop.subprocess_shell(
1795                        functools.partial(MySubprocessProtocol, self.loop),
1796                        'exit 7', stdin=None, stdout=None, stderr=None)
1797        with self.assertWarns(DeprecationWarning):
1798            transp, proto = self.loop.run_until_complete(connect)
1799        self.assertIsInstance(proto, MySubprocessProtocol)
1800        self.assertIsNone(transp.get_pipe_transport(0))
1801        self.assertIsNone(transp.get_pipe_transport(1))
1802        self.assertIsNone(transp.get_pipe_transport(2))
1803        self.loop.run_until_complete(proto.completed)
1804        self.assertEqual(7, proto.returncode)
1805        self.assertIsNone(transp.close())
1806
1807    def test_subprocess_kill(self):
1808        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1809
1810        connect = self.loop.subprocess_exec(
1811                        functools.partial(MySubprocessProtocol, self.loop),
1812                        sys.executable, prog)
1813
1814        with self.assertWarns(DeprecationWarning):
1815            transp, proto = self.loop.run_until_complete(connect)
1816            self.assertIsInstance(proto, MySubprocessProtocol)
1817            self.loop.run_until_complete(proto.connected)
1818
1819            transp.kill()
1820            self.loop.run_until_complete(proto.completed)
1821            self.check_killed(proto.returncode)
1822            transp.close()
1823
1824    def test_subprocess_terminate(self):
1825        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1826
1827        connect = self.loop.subprocess_exec(
1828                        functools.partial(MySubprocessProtocol, self.loop),
1829                        sys.executable, prog)
1830
1831        with self.assertWarns(DeprecationWarning):
1832            transp, proto = self.loop.run_until_complete(connect)
1833            self.assertIsInstance(proto, MySubprocessProtocol)
1834            self.loop.run_until_complete(proto.connected)
1835
1836            transp.terminate()
1837            self.loop.run_until_complete(proto.completed)
1838            self.check_terminated(proto.returncode)
1839            transp.close()
1840
1841    @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
1842    def test_subprocess_send_signal(self):
1843        # bpo-31034: Make sure that we get the default signal handler (killing
1844        # the process). The parent process may have decided to ignore SIGHUP,
1845        # and signal handlers are inherited.
1846        old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
1847        try:
1848            prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1849
1850            connect = self.loop.subprocess_exec(
1851                            functools.partial(MySubprocessProtocol, self.loop),
1852                            sys.executable, prog)
1853
1854            with self.assertWarns(DeprecationWarning):
1855                transp, proto = self.loop.run_until_complete(connect)
1856                self.assertIsInstance(proto, MySubprocessProtocol)
1857                self.loop.run_until_complete(proto.connected)
1858
1859                transp.send_signal(signal.SIGHUP)
1860                self.loop.run_until_complete(proto.completed)
1861                self.assertEqual(-signal.SIGHUP, proto.returncode)
1862                transp.close()
1863        finally:
1864            signal.signal(signal.SIGHUP, old_handler)
1865
1866    def test_subprocess_stderr(self):
1867        prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
1868
1869        connect = self.loop.subprocess_exec(
1870                        functools.partial(MySubprocessProtocol, self.loop),
1871                        sys.executable, prog)
1872
1873        with self.assertWarns(DeprecationWarning):
1874            transp, proto = self.loop.run_until_complete(connect)
1875            self.assertIsInstance(proto, MySubprocessProtocol)
1876            self.loop.run_until_complete(proto.connected)
1877
1878            stdin = transp.get_pipe_transport(0)
1879            stdin.write(b'test')
1880
1881            self.loop.run_until_complete(proto.completed)
1882
1883            transp.close()
1884            self.assertEqual(b'OUT:test', proto.data[1])
1885            self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2])
1886            self.assertEqual(0, proto.returncode)
1887
1888    def test_subprocess_stderr_redirect_to_stdout(self):
1889        prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
1890
1891        connect = self.loop.subprocess_exec(
1892                        functools.partial(MySubprocessProtocol, self.loop),
1893                        sys.executable, prog, stderr=subprocess.STDOUT)
1894
1895        with self.assertWarns(DeprecationWarning):
1896            transp, proto = self.loop.run_until_complete(connect)
1897            self.assertIsInstance(proto, MySubprocessProtocol)
1898            self.loop.run_until_complete(proto.connected)
1899
1900            stdin = transp.get_pipe_transport(0)
1901            self.assertIsNotNone(transp.get_pipe_transport(1))
1902            self.assertIsNone(transp.get_pipe_transport(2))
1903
1904            stdin.write(b'test')
1905            self.loop.run_until_complete(proto.completed)
1906            self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'),
1907                            proto.data[1])
1908            self.assertEqual(b'', proto.data[2])
1909
1910            transp.close()
1911            self.assertEqual(0, proto.returncode)
1912
1913    def test_subprocess_close_client_stream(self):
1914        prog = os.path.join(os.path.dirname(__file__), 'echo3.py')
1915
1916        connect = self.loop.subprocess_exec(
1917                        functools.partial(MySubprocessProtocol, self.loop),
1918                        sys.executable, prog)
1919        with self.assertWarns(DeprecationWarning):
1920            transp, proto = self.loop.run_until_complete(connect)
1921            self.assertIsInstance(proto, MySubprocessProtocol)
1922            self.loop.run_until_complete(proto.connected)
1923
1924            stdin = transp.get_pipe_transport(0)
1925            stdout = transp.get_pipe_transport(1)
1926            stdin.write(b'test')
1927            self.loop.run_until_complete(proto.got_data[1].wait())
1928            self.assertEqual(b'OUT:test', proto.data[1])
1929
1930            stdout.close()
1931            self.loop.run_until_complete(proto.disconnects[1])
1932            stdin.write(b'xxx')
1933            self.loop.run_until_complete(proto.got_data[2].wait())
1934            if sys.platform != 'win32':
1935                self.assertEqual(b'ERR:BrokenPipeError', proto.data[2])
1936            else:
1937                # After closing the read-end of a pipe, writing to the
1938                # write-end using os.write() fails with errno==EINVAL and
1939                # GetLastError()==ERROR_INVALID_NAME on Windows!?!  (Using
1940                # WriteFile() we get ERROR_BROKEN_PIPE as expected.)
1941                self.assertEqual(b'ERR:OSError', proto.data[2])
1942            with test_utils.disable_logger():
1943                transp.close()
1944            self.loop.run_until_complete(proto.completed)
1945            self.check_killed(proto.returncode)
1946
1947    def test_subprocess_wait_no_same_group(self):
1948        # start the new process in a new session
1949        connect = self.loop.subprocess_shell(
1950                        functools.partial(MySubprocessProtocol, self.loop),
1951                        'exit 7', stdin=None, stdout=None, stderr=None,
1952                        start_new_session=True)
1953        _, proto = yield self.loop.run_until_complete(connect)
1954        self.assertIsInstance(proto, MySubprocessProtocol)
1955        self.loop.run_until_complete(proto.completed)
1956        self.assertEqual(7, proto.returncode)
1957
1958    def test_subprocess_exec_invalid_args(self):
1959        async def connect(**kwds):
1960            await self.loop.subprocess_exec(
1961                asyncio.SubprocessProtocol,
1962                'pwd', **kwds)
1963
1964        with self.assertRaises(ValueError):
1965            self.loop.run_until_complete(connect(universal_newlines=True))
1966        with self.assertRaises(ValueError):
1967            self.loop.run_until_complete(connect(bufsize=4096))
1968        with self.assertRaises(ValueError):
1969            self.loop.run_until_complete(connect(shell=True))
1970
1971    def test_subprocess_shell_invalid_args(self):
1972
1973        async def connect(cmd=None, **kwds):
1974            if not cmd:
1975                cmd = 'pwd'
1976            await self.loop.subprocess_shell(
1977                asyncio.SubprocessProtocol,
1978                cmd, **kwds)
1979
1980        with self.assertRaises(ValueError):
1981            self.loop.run_until_complete(connect(['ls', '-l']))
1982        with self.assertRaises(ValueError):
1983            self.loop.run_until_complete(connect(universal_newlines=True))
1984        with self.assertRaises(ValueError):
1985            self.loop.run_until_complete(connect(bufsize=4096))
1986        with self.assertRaises(ValueError):
1987            self.loop.run_until_complete(connect(shell=False))
1988
1989
1990if sys.platform == 'win32':
1991
1992    class SelectEventLoopTests(EventLoopTestsMixin,
1993                               test_utils.TestCase):
1994
1995        def create_event_loop(self):
1996            return asyncio.SelectorEventLoop()
1997
1998    class ProactorEventLoopTests(EventLoopTestsMixin,
1999                                 SubprocessTestsMixin,
2000                                 test_utils.TestCase):
2001
2002        def create_event_loop(self):
2003            return asyncio.ProactorEventLoop()
2004
2005        def test_reader_callback(self):
2006            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2007
2008        def test_reader_callback_cancel(self):
2009            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2010
2011        def test_writer_callback(self):
2012            raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
2013
2014        def test_writer_callback_cancel(self):
2015            raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
2016
2017        def test_remove_fds_after_closing(self):
2018            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2019else:
2020    import selectors
2021
2022    class UnixEventLoopTestsMixin(EventLoopTestsMixin):
2023        def setUp(self):
2024            super().setUp()
2025            watcher = asyncio.SafeChildWatcher()
2026            watcher.attach_loop(self.loop)
2027            asyncio.set_child_watcher(watcher)
2028
2029        def tearDown(self):
2030            asyncio.set_child_watcher(None)
2031            super().tearDown()
2032
2033
2034    if hasattr(selectors, 'KqueueSelector'):
2035        class KqueueEventLoopTests(UnixEventLoopTestsMixin,
2036                                   SubprocessTestsMixin,
2037                                   test_utils.TestCase):
2038
2039            def create_event_loop(self):
2040                return asyncio.SelectorEventLoop(
2041                    selectors.KqueueSelector())
2042
2043            # kqueue doesn't support character devices (PTY) on Mac OS X older
2044            # than 10.9 (Maverick)
2045            @support.requires_mac_ver(10, 9)
2046            # Issue #20667: KqueueEventLoopTests.test_read_pty_output()
2047            # hangs on OpenBSD 5.5
2048            @unittest.skipIf(sys.platform.startswith('openbsd'),
2049                             'test hangs on OpenBSD')
2050            def test_read_pty_output(self):
2051                super().test_read_pty_output()
2052
2053            # kqueue doesn't support character devices (PTY) on Mac OS X older
2054            # than 10.9 (Maverick)
2055            @support.requires_mac_ver(10, 9)
2056            def test_write_pty(self):
2057                super().test_write_pty()
2058
2059    if hasattr(selectors, 'EpollSelector'):
2060        class EPollEventLoopTests(UnixEventLoopTestsMixin,
2061                                  SubprocessTestsMixin,
2062                                  test_utils.TestCase):
2063
2064            def create_event_loop(self):
2065                return asyncio.SelectorEventLoop(selectors.EpollSelector())
2066
2067    if hasattr(selectors, 'PollSelector'):
2068        class PollEventLoopTests(UnixEventLoopTestsMixin,
2069                                 SubprocessTestsMixin,
2070                                 test_utils.TestCase):
2071
2072            def create_event_loop(self):
2073                return asyncio.SelectorEventLoop(selectors.PollSelector())
2074
2075    # Should always exist.
2076    class SelectEventLoopTests(UnixEventLoopTestsMixin,
2077                               SubprocessTestsMixin,
2078                               test_utils.TestCase):
2079
2080        def create_event_loop(self):
2081            return asyncio.SelectorEventLoop(selectors.SelectSelector())
2082
2083
2084def noop(*args, **kwargs):
2085    pass
2086
2087
2088class HandleTests(test_utils.TestCase):
2089
2090    def setUp(self):
2091        super().setUp()
2092        self.loop = mock.Mock()
2093        self.loop.get_debug.return_value = True
2094
2095    def test_handle(self):
2096        def callback(*args):
2097            return args
2098
2099        args = ()
2100        h = asyncio.Handle(callback, args, self.loop)
2101        self.assertIs(h._callback, callback)
2102        self.assertIs(h._args, args)
2103        self.assertFalse(h.cancelled())
2104
2105        h.cancel()
2106        self.assertTrue(h.cancelled())
2107
2108    def test_callback_with_exception(self):
2109        def callback():
2110            raise ValueError()
2111
2112        self.loop = mock.Mock()
2113        self.loop.call_exception_handler = mock.Mock()
2114
2115        h = asyncio.Handle(callback, (), self.loop)
2116        h._run()
2117
2118        self.loop.call_exception_handler.assert_called_with({
2119            'message': test_utils.MockPattern('Exception in callback.*'),
2120            'exception': mock.ANY,
2121            'handle': h,
2122            'source_traceback': h._source_traceback,
2123        })
2124
2125    def test_handle_weakref(self):
2126        wd = weakref.WeakValueDictionary()
2127        h = asyncio.Handle(lambda: None, (), self.loop)
2128        wd['h'] = h  # Would fail without __weakref__ slot.
2129
2130    def test_handle_repr(self):
2131        self.loop.get_debug.return_value = False
2132
2133        # simple function
2134        h = asyncio.Handle(noop, (1, 2), self.loop)
2135        filename, lineno = test_utils.get_function_source(noop)
2136        self.assertEqual(repr(h),
2137                        '<Handle noop(1, 2) at %s:%s>'
2138                        % (filename, lineno))
2139
2140        # cancelled handle
2141        h.cancel()
2142        self.assertEqual(repr(h),
2143                        '<Handle cancelled>')
2144
2145        # decorated function
2146        with self.assertWarns(DeprecationWarning):
2147            cb = asyncio.coroutine(noop)
2148        h = asyncio.Handle(cb, (), self.loop)
2149        self.assertEqual(repr(h),
2150                        '<Handle noop() at %s:%s>'
2151                        % (filename, lineno))
2152
2153        # partial function
2154        cb = functools.partial(noop, 1, 2)
2155        h = asyncio.Handle(cb, (3,), self.loop)
2156        regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$'
2157                 % (re.escape(filename), lineno))
2158        self.assertRegex(repr(h), regex)
2159
2160        # partial function with keyword args
2161        cb = functools.partial(noop, x=1)
2162        h = asyncio.Handle(cb, (2, 3), self.loop)
2163        regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$'
2164                 % (re.escape(filename), lineno))
2165        self.assertRegex(repr(h), regex)
2166
2167        # partial method
2168        if sys.version_info >= (3, 4):
2169            method = HandleTests.test_handle_repr
2170            cb = functools.partialmethod(method)
2171            filename, lineno = test_utils.get_function_source(method)
2172            h = asyncio.Handle(cb, (), self.loop)
2173
2174            cb_regex = r'<function HandleTests.test_handle_repr .*>'
2175            cb_regex = (r'functools.partialmethod\(%s, , \)\(\)' % cb_regex)
2176            regex = (r'^<Handle %s at %s:%s>$'
2177                     % (cb_regex, re.escape(filename), lineno))
2178            self.assertRegex(repr(h), regex)
2179
2180    def test_handle_repr_debug(self):
2181        self.loop.get_debug.return_value = True
2182
2183        # simple function
2184        create_filename = __file__
2185        create_lineno = sys._getframe().f_lineno + 1
2186        h = asyncio.Handle(noop, (1, 2), self.loop)
2187        filename, lineno = test_utils.get_function_source(noop)
2188        self.assertEqual(repr(h),
2189                        '<Handle noop(1, 2) at %s:%s created at %s:%s>'
2190                        % (filename, lineno, create_filename, create_lineno))
2191
2192        # cancelled handle
2193        h.cancel()
2194        self.assertEqual(
2195            repr(h),
2196            '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
2197            % (filename, lineno, create_filename, create_lineno))
2198
2199        # double cancellation won't overwrite _repr
2200        h.cancel()
2201        self.assertEqual(
2202            repr(h),
2203            '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
2204            % (filename, lineno, create_filename, create_lineno))
2205
2206    def test_handle_source_traceback(self):
2207        loop = asyncio.get_event_loop_policy().new_event_loop()
2208        loop.set_debug(True)
2209        self.set_event_loop(loop)
2210
2211        def check_source_traceback(h):
2212            lineno = sys._getframe(1).f_lineno - 1
2213            self.assertIsInstance(h._source_traceback, list)
2214            self.assertEqual(h._source_traceback[-1][:3],
2215                             (__file__,
2216                              lineno,
2217                              'test_handle_source_traceback'))
2218
2219        # call_soon
2220        h = loop.call_soon(noop)
2221        check_source_traceback(h)
2222
2223        # call_soon_threadsafe
2224        h = loop.call_soon_threadsafe(noop)
2225        check_source_traceback(h)
2226
2227        # call_later
2228        h = loop.call_later(0, noop)
2229        check_source_traceback(h)
2230
2231        # call_at
2232        h = loop.call_later(0, noop)
2233        check_source_traceback(h)
2234
2235    @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'),
2236                         'No collections.abc.Coroutine')
2237    def test_coroutine_like_object_debug_formatting(self):
2238        # Test that asyncio can format coroutines that are instances of
2239        # collections.abc.Coroutine, but lack cr_core or gi_code attributes
2240        # (such as ones compiled with Cython).
2241
2242        coro = CoroLike()
2243        coro.__name__ = 'AAA'
2244        self.assertTrue(asyncio.iscoroutine(coro))
2245        self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
2246
2247        coro.__qualname__ = 'BBB'
2248        self.assertEqual(coroutines._format_coroutine(coro), 'BBB()')
2249
2250        coro.cr_running = True
2251        self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running')
2252
2253        coro.__name__ = coro.__qualname__ = None
2254        self.assertEqual(coroutines._format_coroutine(coro),
2255                         '<CoroLike without __name__>() running')
2256
2257        coro = CoroLike()
2258        coro.__qualname__ = 'CoroLike'
2259        # Some coroutines might not have '__name__', such as
2260        # built-in async_gen.asend().
2261        self.assertEqual(coroutines._format_coroutine(coro), 'CoroLike()')
2262
2263        coro = CoroLike()
2264        coro.__qualname__ = 'AAA'
2265        coro.cr_code = None
2266        self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
2267
2268
2269class TimerTests(unittest.TestCase):
2270
2271    def setUp(self):
2272        super().setUp()
2273        self.loop = mock.Mock()
2274
2275    def test_hash(self):
2276        when = time.monotonic()
2277        h = asyncio.TimerHandle(when, lambda: False, (),
2278                                mock.Mock())
2279        self.assertEqual(hash(h), hash(when))
2280
2281    def test_when(self):
2282        when = time.monotonic()
2283        h = asyncio.TimerHandle(when, lambda: False, (),
2284                                mock.Mock())
2285        self.assertEqual(when, h.when())
2286
2287    def test_timer(self):
2288        def callback(*args):
2289            return args
2290
2291        args = (1, 2, 3)
2292        when = time.monotonic()
2293        h = asyncio.TimerHandle(when, callback, args, mock.Mock())
2294        self.assertIs(h._callback, callback)
2295        self.assertIs(h._args, args)
2296        self.assertFalse(h.cancelled())
2297
2298        # cancel
2299        h.cancel()
2300        self.assertTrue(h.cancelled())
2301        self.assertIsNone(h._callback)
2302        self.assertIsNone(h._args)
2303
2304        # when cannot be None
2305        self.assertRaises(AssertionError,
2306                          asyncio.TimerHandle, None, callback, args,
2307                          self.loop)
2308
2309    def test_timer_repr(self):
2310        self.loop.get_debug.return_value = False
2311
2312        # simple function
2313        h = asyncio.TimerHandle(123, noop, (), self.loop)
2314        src = test_utils.get_function_source(noop)
2315        self.assertEqual(repr(h),
2316                        '<TimerHandle when=123 noop() at %s:%s>' % src)
2317
2318        # cancelled handle
2319        h.cancel()
2320        self.assertEqual(repr(h),
2321                        '<TimerHandle cancelled when=123>')
2322
2323    def test_timer_repr_debug(self):
2324        self.loop.get_debug.return_value = True
2325
2326        # simple function
2327        create_filename = __file__
2328        create_lineno = sys._getframe().f_lineno + 1
2329        h = asyncio.TimerHandle(123, noop, (), self.loop)
2330        filename, lineno = test_utils.get_function_source(noop)
2331        self.assertEqual(repr(h),
2332                        '<TimerHandle when=123 noop() '
2333                        'at %s:%s created at %s:%s>'
2334                        % (filename, lineno, create_filename, create_lineno))
2335
2336        # cancelled handle
2337        h.cancel()
2338        self.assertEqual(repr(h),
2339                        '<TimerHandle cancelled when=123 noop() '
2340                        'at %s:%s created at %s:%s>'
2341                        % (filename, lineno, create_filename, create_lineno))
2342
2343
2344    def test_timer_comparison(self):
2345        def callback(*args):
2346            return args
2347
2348        when = time.monotonic()
2349
2350        h1 = asyncio.TimerHandle(when, callback, (), self.loop)
2351        h2 = asyncio.TimerHandle(when, callback, (), self.loop)
2352        # TODO: Use assertLess etc.
2353        self.assertFalse(h1 < h2)
2354        self.assertFalse(h2 < h1)
2355        self.assertTrue(h1 <= h2)
2356        self.assertTrue(h2 <= h1)
2357        self.assertFalse(h1 > h2)
2358        self.assertFalse(h2 > h1)
2359        self.assertTrue(h1 >= h2)
2360        self.assertTrue(h2 >= h1)
2361        self.assertTrue(h1 == h2)
2362        self.assertFalse(h1 != h2)
2363
2364        h2.cancel()
2365        self.assertFalse(h1 == h2)
2366
2367        h1 = asyncio.TimerHandle(when, callback, (), self.loop)
2368        h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop)
2369        self.assertTrue(h1 < h2)
2370        self.assertFalse(h2 < h1)
2371        self.assertTrue(h1 <= h2)
2372        self.assertFalse(h2 <= h1)
2373        self.assertFalse(h1 > h2)
2374        self.assertTrue(h2 > h1)
2375        self.assertFalse(h1 >= h2)
2376        self.assertTrue(h2 >= h1)
2377        self.assertFalse(h1 == h2)
2378        self.assertTrue(h1 != h2)
2379
2380        h3 = asyncio.Handle(callback, (), self.loop)
2381        self.assertIs(NotImplemented, h1.__eq__(h3))
2382        self.assertIs(NotImplemented, h1.__ne__(h3))
2383
2384
2385class AbstractEventLoopTests(unittest.TestCase):
2386
2387    def test_not_implemented(self):
2388        f = mock.Mock()
2389        loop = asyncio.AbstractEventLoop()
2390        self.assertRaises(
2391            NotImplementedError, loop.run_forever)
2392        self.assertRaises(
2393            NotImplementedError, loop.run_until_complete, None)
2394        self.assertRaises(
2395            NotImplementedError, loop.stop)
2396        self.assertRaises(
2397            NotImplementedError, loop.is_running)
2398        self.assertRaises(
2399            NotImplementedError, loop.is_closed)
2400        self.assertRaises(
2401            NotImplementedError, loop.close)
2402        self.assertRaises(
2403            NotImplementedError, loop.create_task, None)
2404        self.assertRaises(
2405            NotImplementedError, loop.call_later, None, None)
2406        self.assertRaises(
2407            NotImplementedError, loop.call_at, f, f)
2408        self.assertRaises(
2409            NotImplementedError, loop.call_soon, None)
2410        self.assertRaises(
2411            NotImplementedError, loop.time)
2412        self.assertRaises(
2413            NotImplementedError, loop.call_soon_threadsafe, None)
2414        self.assertRaises(
2415            NotImplementedError, loop.set_default_executor, f)
2416        self.assertRaises(
2417            NotImplementedError, loop.add_reader, 1, f)
2418        self.assertRaises(
2419            NotImplementedError, loop.remove_reader, 1)
2420        self.assertRaises(
2421            NotImplementedError, loop.add_writer, 1, f)
2422        self.assertRaises(
2423            NotImplementedError, loop.remove_writer, 1)
2424        self.assertRaises(
2425            NotImplementedError, loop.add_signal_handler, 1, f)
2426        self.assertRaises(
2427            NotImplementedError, loop.remove_signal_handler, 1)
2428        self.assertRaises(
2429            NotImplementedError, loop.remove_signal_handler, 1)
2430        self.assertRaises(
2431            NotImplementedError, loop.set_exception_handler, f)
2432        self.assertRaises(
2433            NotImplementedError, loop.default_exception_handler, f)
2434        self.assertRaises(
2435            NotImplementedError, loop.call_exception_handler, f)
2436        self.assertRaises(
2437            NotImplementedError, loop.get_debug)
2438        self.assertRaises(
2439            NotImplementedError, loop.set_debug, f)
2440
2441    def test_not_implemented_async(self):
2442
2443        async def inner():
2444            f = mock.Mock()
2445            loop = asyncio.AbstractEventLoop()
2446
2447            with self.assertRaises(NotImplementedError):
2448                await loop.run_in_executor(f, f)
2449            with self.assertRaises(NotImplementedError):
2450                await loop.getaddrinfo('localhost', 8080)
2451            with self.assertRaises(NotImplementedError):
2452                await loop.getnameinfo(('localhost', 8080))
2453            with self.assertRaises(NotImplementedError):
2454                await loop.create_connection(f)
2455            with self.assertRaises(NotImplementedError):
2456                await loop.create_server(f)
2457            with self.assertRaises(NotImplementedError):
2458                await loop.create_datagram_endpoint(f)
2459            with self.assertRaises(NotImplementedError):
2460                await loop.sock_recv(f, 10)
2461            with self.assertRaises(NotImplementedError):
2462                await loop.sock_recv_into(f, 10)
2463            with self.assertRaises(NotImplementedError):
2464                await loop.sock_sendall(f, 10)
2465            with self.assertRaises(NotImplementedError):
2466                await loop.sock_connect(f, f)
2467            with self.assertRaises(NotImplementedError):
2468                await loop.sock_accept(f)
2469            with self.assertRaises(NotImplementedError):
2470                await loop.sock_sendfile(f, f)
2471            with self.assertRaises(NotImplementedError):
2472                await loop.sendfile(f, f)
2473            with self.assertRaises(NotImplementedError):
2474                await loop.connect_read_pipe(f, mock.sentinel.pipe)
2475            with self.assertRaises(NotImplementedError):
2476                await loop.connect_write_pipe(f, mock.sentinel.pipe)
2477            with self.assertRaises(NotImplementedError):
2478                await loop.subprocess_shell(f, mock.sentinel)
2479            with self.assertRaises(NotImplementedError):
2480                await loop.subprocess_exec(f)
2481
2482        loop = asyncio.new_event_loop()
2483        loop.run_until_complete(inner())
2484        loop.close()
2485
2486
2487class PolicyTests(unittest.TestCase):
2488
2489    def test_event_loop_policy(self):
2490        policy = asyncio.AbstractEventLoopPolicy()
2491        self.assertRaises(NotImplementedError, policy.get_event_loop)
2492        self.assertRaises(NotImplementedError, policy.set_event_loop, object())
2493        self.assertRaises(NotImplementedError, policy.new_event_loop)
2494        self.assertRaises(NotImplementedError, policy.get_child_watcher)
2495        self.assertRaises(NotImplementedError, policy.set_child_watcher,
2496                          object())
2497
2498    def test_get_event_loop(self):
2499        policy = asyncio.DefaultEventLoopPolicy()
2500        self.assertIsNone(policy._local._loop)
2501
2502        loop = policy.get_event_loop()
2503        self.assertIsInstance(loop, asyncio.AbstractEventLoop)
2504
2505        self.assertIs(policy._local._loop, loop)
2506        self.assertIs(loop, policy.get_event_loop())
2507        loop.close()
2508
2509    def test_get_event_loop_calls_set_event_loop(self):
2510        policy = asyncio.DefaultEventLoopPolicy()
2511
2512        with mock.patch.object(
2513                policy, "set_event_loop",
2514                wraps=policy.set_event_loop) as m_set_event_loop:
2515
2516            loop = policy.get_event_loop()
2517
2518            # policy._local._loop must be set through .set_event_loop()
2519            # (the unix DefaultEventLoopPolicy needs this call to attach
2520            # the child watcher correctly)
2521            m_set_event_loop.assert_called_with(loop)
2522
2523        loop.close()
2524
2525    def test_get_event_loop_after_set_none(self):
2526        policy = asyncio.DefaultEventLoopPolicy()
2527        policy.set_event_loop(None)
2528        self.assertRaises(RuntimeError, policy.get_event_loop)
2529
2530    @mock.patch('asyncio.events.threading.current_thread')
2531    def test_get_event_loop_thread(self, m_current_thread):
2532
2533        def f():
2534            policy = asyncio.DefaultEventLoopPolicy()
2535            self.assertRaises(RuntimeError, policy.get_event_loop)
2536
2537        th = threading.Thread(target=f)
2538        th.start()
2539        th.join()
2540
2541    def test_new_event_loop(self):
2542        policy = asyncio.DefaultEventLoopPolicy()
2543
2544        loop = policy.new_event_loop()
2545        self.assertIsInstance(loop, asyncio.AbstractEventLoop)
2546        loop.close()
2547
2548    def test_set_event_loop(self):
2549        policy = asyncio.DefaultEventLoopPolicy()
2550        old_loop = policy.get_event_loop()
2551
2552        self.assertRaises(AssertionError, policy.set_event_loop, object())
2553
2554        loop = policy.new_event_loop()
2555        policy.set_event_loop(loop)
2556        self.assertIs(loop, policy.get_event_loop())
2557        self.assertIsNot(old_loop, policy.get_event_loop())
2558        loop.close()
2559        old_loop.close()
2560
2561    def test_get_event_loop_policy(self):
2562        policy = asyncio.get_event_loop_policy()
2563        self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
2564        self.assertIs(policy, asyncio.get_event_loop_policy())
2565
2566    def test_set_event_loop_policy(self):
2567        self.assertRaises(
2568            AssertionError, asyncio.set_event_loop_policy, object())
2569
2570        old_policy = asyncio.get_event_loop_policy()
2571
2572        policy = asyncio.DefaultEventLoopPolicy()
2573        asyncio.set_event_loop_policy(policy)
2574        self.assertIs(policy, asyncio.get_event_loop_policy())
2575        self.assertIsNot(policy, old_policy)
2576
2577
2578class GetEventLoopTestsMixin:
2579
2580    _get_running_loop_impl = None
2581    _set_running_loop_impl = None
2582    get_running_loop_impl = None
2583    get_event_loop_impl = None
2584
2585    def setUp(self):
2586        self._get_running_loop_saved = events._get_running_loop
2587        self._set_running_loop_saved = events._set_running_loop
2588        self.get_running_loop_saved = events.get_running_loop
2589        self.get_event_loop_saved = events.get_event_loop
2590
2591        events._get_running_loop = type(self)._get_running_loop_impl
2592        events._set_running_loop = type(self)._set_running_loop_impl
2593        events.get_running_loop = type(self).get_running_loop_impl
2594        events.get_event_loop = type(self).get_event_loop_impl
2595
2596        asyncio._get_running_loop = type(self)._get_running_loop_impl
2597        asyncio._set_running_loop = type(self)._set_running_loop_impl
2598        asyncio.get_running_loop = type(self).get_running_loop_impl
2599        asyncio.get_event_loop = type(self).get_event_loop_impl
2600
2601        super().setUp()
2602
2603        self.loop = asyncio.new_event_loop()
2604        asyncio.set_event_loop(self.loop)
2605
2606        if sys.platform != 'win32':
2607            watcher = asyncio.SafeChildWatcher()
2608            watcher.attach_loop(self.loop)
2609            asyncio.set_child_watcher(watcher)
2610
2611    def tearDown(self):
2612        try:
2613            if sys.platform != 'win32':
2614                asyncio.set_child_watcher(None)
2615
2616            super().tearDown()
2617        finally:
2618            self.loop.close()
2619            asyncio.set_event_loop(None)
2620
2621            events._get_running_loop = self._get_running_loop_saved
2622            events._set_running_loop = self._set_running_loop_saved
2623            events.get_running_loop = self.get_running_loop_saved
2624            events.get_event_loop = self.get_event_loop_saved
2625
2626            asyncio._get_running_loop = self._get_running_loop_saved
2627            asyncio._set_running_loop = self._set_running_loop_saved
2628            asyncio.get_running_loop = self.get_running_loop_saved
2629            asyncio.get_event_loop = self.get_event_loop_saved
2630
2631    if sys.platform != 'win32':
2632
2633        def test_get_event_loop_new_process(self):
2634            # Issue bpo-32126: The multiprocessing module used by
2635            # ProcessPoolExecutor is not functional when the
2636            # multiprocessing.synchronize module cannot be imported.
2637            support.import_module('multiprocessing.synchronize')
2638
2639            async def main():
2640                pool = concurrent.futures.ProcessPoolExecutor()
2641                result = await self.loop.run_in_executor(
2642                    pool, _test_get_event_loop_new_process__sub_proc)
2643                pool.shutdown()
2644                return result
2645
2646            self.assertEqual(
2647                self.loop.run_until_complete(main()),
2648                'hello')
2649
2650    def test_get_event_loop_returns_running_loop(self):
2651        class TestError(Exception):
2652            pass
2653
2654        class Policy(asyncio.DefaultEventLoopPolicy):
2655            def get_event_loop(self):
2656                raise TestError
2657
2658        old_policy = asyncio.get_event_loop_policy()
2659        try:
2660            asyncio.set_event_loop_policy(Policy())
2661            loop = asyncio.new_event_loop()
2662
2663            with self.assertRaises(TestError):
2664                asyncio.get_event_loop()
2665            asyncio.set_event_loop(None)
2666            with self.assertRaises(TestError):
2667                asyncio.get_event_loop()
2668
2669            with self.assertRaisesRegex(RuntimeError, 'no running'):
2670                self.assertIs(asyncio.get_running_loop(), None)
2671            self.assertIs(asyncio._get_running_loop(), None)
2672
2673            async def func():
2674                self.assertIs(asyncio.get_event_loop(), loop)
2675                self.assertIs(asyncio.get_running_loop(), loop)
2676                self.assertIs(asyncio._get_running_loop(), loop)
2677
2678            loop.run_until_complete(func())
2679
2680            asyncio.set_event_loop(loop)
2681            with self.assertRaises(TestError):
2682                asyncio.get_event_loop()
2683
2684            asyncio.set_event_loop(None)
2685            with self.assertRaises(TestError):
2686                asyncio.get_event_loop()
2687
2688        finally:
2689            asyncio.set_event_loop_policy(old_policy)
2690            if loop is not None:
2691                loop.close()
2692
2693        with self.assertRaisesRegex(RuntimeError, 'no running'):
2694            self.assertIs(asyncio.get_running_loop(), None)
2695
2696        self.assertIs(asyncio._get_running_loop(), None)
2697
2698
2699class TestPyGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):
2700
2701    _get_running_loop_impl = events._py__get_running_loop
2702    _set_running_loop_impl = events._py__set_running_loop
2703    get_running_loop_impl = events._py_get_running_loop
2704    get_event_loop_impl = events._py_get_event_loop
2705
2706
2707try:
2708    import _asyncio  # NoQA
2709except ImportError:
2710    pass
2711else:
2712
2713    class TestCGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):
2714
2715        _get_running_loop_impl = events._c__get_running_loop
2716        _set_running_loop_impl = events._c__set_running_loop
2717        get_running_loop_impl = events._c_get_running_loop
2718        get_event_loop_impl = events._c_get_event_loop
2719
2720
2721class TestServer(unittest.TestCase):
2722
2723    def test_get_loop(self):
2724        loop = asyncio.new_event_loop()
2725        self.addCleanup(loop.close)
2726        proto = MyProto(loop)
2727        server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0))
2728        self.assertEqual(server.get_loop(), loop)
2729        server.close()
2730        loop.run_until_complete(server.wait_closed())
2731
2732
2733class TestAbstractServer(unittest.TestCase):
2734
2735    def test_close(self):
2736        with self.assertRaises(NotImplementedError):
2737            events.AbstractServer().close()
2738
2739    def test_wait_closed(self):
2740        loop = asyncio.new_event_loop()
2741        self.addCleanup(loop.close)
2742
2743        with self.assertRaises(NotImplementedError):
2744            loop.run_until_complete(events.AbstractServer().wait_closed())
2745
2746    def test_get_loop(self):
2747        with self.assertRaises(NotImplementedError):
2748            events.AbstractServer().get_loop()
2749
2750
2751if __name__ == '__main__':
2752    unittest.main()
2753