• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for base_events.py"""
2
3import concurrent.futures
4import errno
5import math
6import socket
7import sys
8import threading
9import time
10import unittest
11from unittest import mock
12
13import asyncio
14from asyncio import base_events
15from asyncio import constants
16from test.test_asyncio import utils as test_utils
17from test import support
18from test.support.script_helper import assert_python_ok
19from test.support import os_helper
20from test.support import socket_helper
21
22
23MOCK_ANY = mock.ANY
24PY34 = sys.version_info >= (3, 4)
25
26
27def tearDownModule():
28    asyncio.set_event_loop_policy(None)
29
30
31def mock_socket_module():
32    m_socket = mock.MagicMock(spec=socket)
33    for name in (
34        'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP',
35        'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton'
36    ):
37        if hasattr(socket, name):
38            setattr(m_socket, name, getattr(socket, name))
39        else:
40            delattr(m_socket, name)
41
42    m_socket.socket = mock.MagicMock()
43    m_socket.socket.return_value = test_utils.mock_nonblocking_socket()
44    m_socket.getaddrinfo._is_coroutine = False
45
46    return m_socket
47
48
49def patch_socket(f):
50    return mock.patch('asyncio.base_events.socket',
51                      new_callable=mock_socket_module)(f)
52
53
54class BaseEventTests(test_utils.TestCase):
55
56    def test_ipaddr_info(self):
57        UNSPEC = socket.AF_UNSPEC
58        INET = socket.AF_INET
59        INET6 = socket.AF_INET6
60        STREAM = socket.SOCK_STREAM
61        DGRAM = socket.SOCK_DGRAM
62        TCP = socket.IPPROTO_TCP
63        UDP = socket.IPPROTO_UDP
64
65        self.assertEqual(
66            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
67            base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP))
68
69        self.assertEqual(
70            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
71            base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP))
72
73        self.assertEqual(
74            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
75            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP))
76
77        self.assertEqual(
78            (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
79            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP))
80
81        # Socket type STREAM implies TCP protocol.
82        self.assertEqual(
83            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
84            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0))
85
86        # Socket type DGRAM implies UDP protocol.
87        self.assertEqual(
88            (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
89            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0))
90
91        # No socket type.
92        self.assertIsNone(
93            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0))
94
95        if socket_helper.IPV6_ENABLED:
96            # IPv4 address with family IPv6.
97            self.assertIsNone(
98                base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP))
99
100            self.assertEqual(
101                (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)),
102                base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP))
103
104            self.assertEqual(
105                (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)),
106                base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP))
107
108            # IPv6 address with family IPv4.
109            self.assertIsNone(
110                base_events._ipaddr_info('::3', 1, INET, STREAM, TCP))
111
112            # IPv6 address with zone index.
113            self.assertIsNone(
114                base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP))
115
116    def test_port_parameter_types(self):
117        # Test obscure kinds of arguments for "port".
118        INET = socket.AF_INET
119        STREAM = socket.SOCK_STREAM
120        TCP = socket.IPPROTO_TCP
121
122        self.assertEqual(
123            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
124            base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP))
125
126        self.assertEqual(
127            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
128            base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP))
129
130        self.assertEqual(
131            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
132            base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP))
133
134        self.assertEqual(
135            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
136            base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP))
137
138        self.assertEqual(
139            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
140            base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP))
141
142    @patch_socket
143    def test_ipaddr_info_no_inet_pton(self, m_socket):
144        del m_socket.inet_pton
145        self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1,
146                                                   socket.AF_INET,
147                                                   socket.SOCK_STREAM,
148                                                   socket.IPPROTO_TCP))
149
150
151class BaseEventLoopTests(test_utils.TestCase):
152
153    def setUp(self):
154        super().setUp()
155        self.loop = base_events.BaseEventLoop()
156        self.loop._selector = mock.Mock()
157        self.loop._selector.select.return_value = ()
158        self.set_event_loop(self.loop)
159
160    def test_not_implemented(self):
161        m = mock.Mock()
162        self.assertRaises(
163            NotImplementedError,
164            self.loop._make_socket_transport, m, m)
165        self.assertRaises(
166            NotImplementedError,
167            self.loop._make_ssl_transport, m, m, m, m)
168        self.assertRaises(
169            NotImplementedError,
170            self.loop._make_datagram_transport, m, m)
171        self.assertRaises(
172            NotImplementedError, self.loop._process_events, [])
173        self.assertRaises(
174            NotImplementedError, self.loop._write_to_self)
175        self.assertRaises(
176            NotImplementedError,
177            self.loop._make_read_pipe_transport, m, m)
178        self.assertRaises(
179            NotImplementedError,
180            self.loop._make_write_pipe_transport, m, m)
181        gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m)
182        with self.assertRaises(NotImplementedError):
183            gen.send(None)
184
185    def test_close(self):
186        self.assertFalse(self.loop.is_closed())
187        self.loop.close()
188        self.assertTrue(self.loop.is_closed())
189
190        # it should be possible to call close() more than once
191        self.loop.close()
192        self.loop.close()
193
194        # operation blocked when the loop is closed
195        f = self.loop.create_future()
196        self.assertRaises(RuntimeError, self.loop.run_forever)
197        self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
198
199    def test__add_callback_handle(self):
200        h = asyncio.Handle(lambda: False, (), self.loop, None)
201
202        self.loop._add_callback(h)
203        self.assertFalse(self.loop._scheduled)
204        self.assertIn(h, self.loop._ready)
205
206    def test__add_callback_cancelled_handle(self):
207        h = asyncio.Handle(lambda: False, (), self.loop, None)
208        h.cancel()
209
210        self.loop._add_callback(h)
211        self.assertFalse(self.loop._scheduled)
212        self.assertFalse(self.loop._ready)
213
214    def test_set_default_executor(self):
215        class DummyExecutor(concurrent.futures.ThreadPoolExecutor):
216            def submit(self, fn, *args, **kwargs):
217                raise NotImplementedError(
218                    'cannot submit into a dummy executor')
219
220        self.loop._process_events = mock.Mock()
221        self.loop._write_to_self = mock.Mock()
222
223        executor = DummyExecutor()
224        self.loop.set_default_executor(executor)
225        self.assertIs(executor, self.loop._default_executor)
226
227    def test_set_default_executor_deprecation_warnings(self):
228        executor = mock.Mock()
229
230        with self.assertWarns(DeprecationWarning):
231            self.loop.set_default_executor(executor)
232
233        # Avoid cleaning up the executor mock
234        self.loop._default_executor = None
235
236    def test_call_soon(self):
237        def cb():
238            pass
239
240        h = self.loop.call_soon(cb)
241        self.assertEqual(h._callback, cb)
242        self.assertIsInstance(h, asyncio.Handle)
243        self.assertIn(h, self.loop._ready)
244
245    def test_call_soon_non_callable(self):
246        self.loop.set_debug(True)
247        with self.assertRaisesRegex(TypeError, 'a callable object'):
248            self.loop.call_soon(1)
249
250    def test_call_later(self):
251        def cb():
252            pass
253
254        h = self.loop.call_later(10.0, cb)
255        self.assertIsInstance(h, asyncio.TimerHandle)
256        self.assertIn(h, self.loop._scheduled)
257        self.assertNotIn(h, self.loop._ready)
258
259    def test_call_later_negative_delays(self):
260        calls = []
261
262        def cb(arg):
263            calls.append(arg)
264
265        self.loop._process_events = mock.Mock()
266        self.loop.call_later(-1, cb, 'a')
267        self.loop.call_later(-2, cb, 'b')
268        test_utils.run_briefly(self.loop)
269        self.assertEqual(calls, ['b', 'a'])
270
271    def test_time_and_call_at(self):
272        def cb():
273            self.loop.stop()
274
275        self.loop._process_events = mock.Mock()
276        delay = 0.1
277
278        when = self.loop.time() + delay
279        self.loop.call_at(when, cb)
280        t0 = self.loop.time()
281        self.loop.run_forever()
282        dt = self.loop.time() - t0
283
284        # 50 ms: maximum granularity of the event loop
285        self.assertGreaterEqual(dt, delay - 0.050, dt)
286        # tolerate a difference of +800 ms because some Python buildbots
287        # are really slow
288        self.assertLessEqual(dt, 0.9, dt)
289
290    def check_thread(self, loop, debug):
291        def cb():
292            pass
293
294        loop.set_debug(debug)
295        if debug:
296            msg = ("Non-thread-safe operation invoked on an event loop other "
297                   "than the current one")
298            with self.assertRaisesRegex(RuntimeError, msg):
299                loop.call_soon(cb)
300            with self.assertRaisesRegex(RuntimeError, msg):
301                loop.call_later(60, cb)
302            with self.assertRaisesRegex(RuntimeError, msg):
303                loop.call_at(loop.time() + 60, cb)
304        else:
305            loop.call_soon(cb)
306            loop.call_later(60, cb)
307            loop.call_at(loop.time() + 60, cb)
308
309    def test_check_thread(self):
310        def check_in_thread(loop, event, debug, create_loop, fut):
311            # wait until the event loop is running
312            event.wait()
313
314            try:
315                if create_loop:
316                    loop2 = base_events.BaseEventLoop()
317                    try:
318                        asyncio.set_event_loop(loop2)
319                        self.check_thread(loop, debug)
320                    finally:
321                        asyncio.set_event_loop(None)
322                        loop2.close()
323                else:
324                    self.check_thread(loop, debug)
325            except Exception as exc:
326                loop.call_soon_threadsafe(fut.set_exception, exc)
327            else:
328                loop.call_soon_threadsafe(fut.set_result, None)
329
330        def test_thread(loop, debug, create_loop=False):
331            event = threading.Event()
332            fut = loop.create_future()
333            loop.call_soon(event.set)
334            args = (loop, event, debug, create_loop, fut)
335            thread = threading.Thread(target=check_in_thread, args=args)
336            thread.start()
337            loop.run_until_complete(fut)
338            thread.join()
339
340        self.loop._process_events = mock.Mock()
341        self.loop._write_to_self = mock.Mock()
342
343        # raise RuntimeError if the thread has no event loop
344        test_thread(self.loop, True)
345
346        # check disabled if debug mode is disabled
347        test_thread(self.loop, False)
348
349        # raise RuntimeError if the event loop of the thread is not the called
350        # event loop
351        test_thread(self.loop, True, create_loop=True)
352
353        # check disabled if debug mode is disabled
354        test_thread(self.loop, False, create_loop=True)
355
356    def test__run_once(self):
357        h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (),
358                                 self.loop, None)
359        h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (),
360                                 self.loop, None)
361
362        h1.cancel()
363
364        self.loop._process_events = mock.Mock()
365        self.loop._scheduled.append(h1)
366        self.loop._scheduled.append(h2)
367        self.loop._run_once()
368
369        t = self.loop._selector.select.call_args[0][0]
370        self.assertTrue(9.5 < t < 10.5, t)
371        self.assertEqual([h2], self.loop._scheduled)
372        self.assertTrue(self.loop._process_events.called)
373
374    def test_set_debug(self):
375        self.loop.set_debug(True)
376        self.assertTrue(self.loop.get_debug())
377        self.loop.set_debug(False)
378        self.assertFalse(self.loop.get_debug())
379
380    def test__run_once_schedule_handle(self):
381        handle = None
382        processed = False
383
384        def cb(loop):
385            nonlocal processed, handle
386            processed = True
387            handle = loop.call_soon(lambda: True)
388
389        h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
390                                self.loop, None)
391
392        self.loop._process_events = mock.Mock()
393        self.loop._scheduled.append(h)
394        self.loop._run_once()
395
396        self.assertTrue(processed)
397        self.assertEqual([handle], list(self.loop._ready))
398
399    def test__run_once_cancelled_event_cleanup(self):
400        self.loop._process_events = mock.Mock()
401
402        self.assertTrue(
403            0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0)
404
405        def cb():
406            pass
407
408        # Set up one "blocking" event that will not be cancelled to
409        # ensure later cancelled events do not make it to the head
410        # of the queue and get cleaned.
411        not_cancelled_count = 1
412        self.loop.call_later(3000, cb)
413
414        # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES)
415        # cancelled handles, ensure they aren't removed
416
417        cancelled_count = 2
418        for x in range(2):
419            h = self.loop.call_later(3600, cb)
420            h.cancel()
421
422        # Add some cancelled events that will be at head and removed
423        cancelled_count += 2
424        for x in range(2):
425            h = self.loop.call_later(100, cb)
426            h.cancel()
427
428        # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low
429        self.assertLessEqual(cancelled_count + not_cancelled_count,
430            base_events._MIN_SCHEDULED_TIMER_HANDLES)
431
432        self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
433
434        self.loop._run_once()
435
436        cancelled_count -= 2
437
438        self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
439
440        self.assertEqual(len(self.loop._scheduled),
441            cancelled_count + not_cancelled_count)
442
443        # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION
444        # so that deletion of cancelled events will occur on next _run_once
445        add_cancel_count = int(math.ceil(
446            base_events._MIN_SCHEDULED_TIMER_HANDLES *
447            base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1
448
449        add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES -
450            add_cancel_count, 0)
451
452        # Add some events that will not be cancelled
453        not_cancelled_count += add_not_cancel_count
454        for x in range(add_not_cancel_count):
455            self.loop.call_later(3600, cb)
456
457        # Add enough cancelled events
458        cancelled_count += add_cancel_count
459        for x in range(add_cancel_count):
460            h = self.loop.call_later(3600, cb)
461            h.cancel()
462
463        # Ensure all handles are still scheduled
464        self.assertEqual(len(self.loop._scheduled),
465            cancelled_count + not_cancelled_count)
466
467        self.loop._run_once()
468
469        # Ensure cancelled events were removed
470        self.assertEqual(len(self.loop._scheduled), not_cancelled_count)
471
472        # Ensure only uncancelled events remain scheduled
473        self.assertTrue(all([not x._cancelled for x in self.loop._scheduled]))
474
475    def test_run_until_complete_type_error(self):
476        self.assertRaises(TypeError,
477            self.loop.run_until_complete, 'blah')
478
479    def test_run_until_complete_loop(self):
480        task = self.loop.create_future()
481        other_loop = self.new_test_loop()
482        self.addCleanup(other_loop.close)
483        self.assertRaises(ValueError,
484            other_loop.run_until_complete, task)
485
486    def test_run_until_complete_loop_orphan_future_close_loop(self):
487        class ShowStopper(SystemExit):
488            pass
489
490        async def foo(delay):
491            await asyncio.sleep(delay)
492
493        def throw():
494            raise ShowStopper
495
496        self.loop._process_events = mock.Mock()
497        self.loop.call_soon(throw)
498        with self.assertRaises(ShowStopper):
499            self.loop.run_until_complete(foo(0.1))
500
501        # This call fails if run_until_complete does not clean up
502        # done-callback for the previous future.
503        self.loop.run_until_complete(foo(0.2))
504
505    def test_subprocess_exec_invalid_args(self):
506        args = [sys.executable, '-c', 'pass']
507
508        # missing program parameter (empty args)
509        self.assertRaises(TypeError,
510            self.loop.run_until_complete, self.loop.subprocess_exec,
511            asyncio.SubprocessProtocol)
512
513        # expected multiple arguments, not a list
514        self.assertRaises(TypeError,
515            self.loop.run_until_complete, self.loop.subprocess_exec,
516            asyncio.SubprocessProtocol, args)
517
518        # program arguments must be strings, not int
519        self.assertRaises(TypeError,
520            self.loop.run_until_complete, self.loop.subprocess_exec,
521            asyncio.SubprocessProtocol, sys.executable, 123)
522
523        # universal_newlines, shell, bufsize must not be set
524        self.assertRaises(TypeError,
525        self.loop.run_until_complete, self.loop.subprocess_exec,
526            asyncio.SubprocessProtocol, *args, universal_newlines=True)
527        self.assertRaises(TypeError,
528            self.loop.run_until_complete, self.loop.subprocess_exec,
529            asyncio.SubprocessProtocol, *args, shell=True)
530        self.assertRaises(TypeError,
531            self.loop.run_until_complete, self.loop.subprocess_exec,
532            asyncio.SubprocessProtocol, *args, bufsize=4096)
533
534    def test_subprocess_shell_invalid_args(self):
535        # expected a string, not an int or a list
536        self.assertRaises(TypeError,
537            self.loop.run_until_complete, self.loop.subprocess_shell,
538            asyncio.SubprocessProtocol, 123)
539        self.assertRaises(TypeError,
540            self.loop.run_until_complete, self.loop.subprocess_shell,
541            asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass'])
542
543        # universal_newlines, shell, bufsize must not be set
544        self.assertRaises(TypeError,
545            self.loop.run_until_complete, self.loop.subprocess_shell,
546            asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True)
547        self.assertRaises(TypeError,
548            self.loop.run_until_complete, self.loop.subprocess_shell,
549            asyncio.SubprocessProtocol, 'exit 0', shell=True)
550        self.assertRaises(TypeError,
551            self.loop.run_until_complete, self.loop.subprocess_shell,
552            asyncio.SubprocessProtocol, 'exit 0', bufsize=4096)
553
554    def test_default_exc_handler_callback(self):
555        self.loop._process_events = mock.Mock()
556
557        def zero_error(fut):
558            fut.set_result(True)
559            1/0
560
561        # Test call_soon (events.Handle)
562        with mock.patch('asyncio.base_events.logger') as log:
563            fut = self.loop.create_future()
564            self.loop.call_soon(zero_error, fut)
565            fut.add_done_callback(lambda fut: self.loop.stop())
566            self.loop.run_forever()
567            log.error.assert_called_with(
568                test_utils.MockPattern('Exception in callback.*zero'),
569                exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
570
571        # Test call_later (events.TimerHandle)
572        with mock.patch('asyncio.base_events.logger') as log:
573            fut = self.loop.create_future()
574            self.loop.call_later(0.01, zero_error, fut)
575            fut.add_done_callback(lambda fut: self.loop.stop())
576            self.loop.run_forever()
577            log.error.assert_called_with(
578                test_utils.MockPattern('Exception in callback.*zero'),
579                exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
580
581    def test_default_exc_handler_coro(self):
582        self.loop._process_events = mock.Mock()
583
584        async def zero_error_coro():
585            await asyncio.sleep(0.01)
586            1/0
587
588        # Test Future.__del__
589        with mock.patch('asyncio.base_events.logger') as log:
590            fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop)
591            fut.add_done_callback(lambda *args: self.loop.stop())
592            self.loop.run_forever()
593            fut = None # Trigger Future.__del__ or futures._TracebackLogger
594            support.gc_collect()
595            if PY34:
596                # Future.__del__ in Python 3.4 logs error with
597                # an actual exception context
598                log.error.assert_called_with(
599                    test_utils.MockPattern('.*exception was never retrieved'),
600                    exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
601            else:
602                # futures._TracebackLogger logs only textual traceback
603                log.error.assert_called_with(
604                    test_utils.MockPattern(
605                        '.*exception was never retrieved.*ZeroDiv'),
606                    exc_info=False)
607
608    def test_set_exc_handler_invalid(self):
609        with self.assertRaisesRegex(TypeError, 'A callable object or None'):
610            self.loop.set_exception_handler('spam')
611
612    def test_set_exc_handler_custom(self):
613        def zero_error():
614            1/0
615
616        def run_loop():
617            handle = self.loop.call_soon(zero_error)
618            self.loop._run_once()
619            return handle
620
621        self.loop.set_debug(True)
622        self.loop._process_events = mock.Mock()
623
624        self.assertIsNone(self.loop.get_exception_handler())
625        mock_handler = mock.Mock()
626        self.loop.set_exception_handler(mock_handler)
627        self.assertIs(self.loop.get_exception_handler(), mock_handler)
628        handle = run_loop()
629        mock_handler.assert_called_with(self.loop, {
630            'exception': MOCK_ANY,
631            'message': test_utils.MockPattern(
632                                'Exception in callback.*zero_error'),
633            'handle': handle,
634            'source_traceback': handle._source_traceback,
635        })
636        mock_handler.reset_mock()
637
638        self.loop.set_exception_handler(None)
639        with mock.patch('asyncio.base_events.logger') as log:
640            run_loop()
641            log.error.assert_called_with(
642                        test_utils.MockPattern(
643                                'Exception in callback.*zero'),
644                        exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
645
646        self.assertFalse(mock_handler.called)
647
648    def test_set_exc_handler_broken(self):
649        def run_loop():
650            def zero_error():
651                1/0
652            self.loop.call_soon(zero_error)
653            self.loop._run_once()
654
655        def handler(loop, context):
656            raise AttributeError('spam')
657
658        self.loop._process_events = mock.Mock()
659
660        self.loop.set_exception_handler(handler)
661
662        with mock.patch('asyncio.base_events.logger') as log:
663            run_loop()
664            log.error.assert_called_with(
665                test_utils.MockPattern(
666                    'Unhandled error in exception handler'),
667                exc_info=(AttributeError, MOCK_ANY, MOCK_ANY))
668
669    def test_default_exc_handler_broken(self):
670        _context = None
671
672        class Loop(base_events.BaseEventLoop):
673
674            _selector = mock.Mock()
675            _process_events = mock.Mock()
676
677            def default_exception_handler(self, context):
678                nonlocal _context
679                _context = context
680                # Simulates custom buggy "default_exception_handler"
681                raise ValueError('spam')
682
683        loop = Loop()
684        self.addCleanup(loop.close)
685        asyncio.set_event_loop(loop)
686
687        def run_loop():
688            def zero_error():
689                1/0
690            loop.call_soon(zero_error)
691            loop._run_once()
692
693        with mock.patch('asyncio.base_events.logger') as log:
694            run_loop()
695            log.error.assert_called_with(
696                'Exception in default exception handler',
697                exc_info=True)
698
699        def custom_handler(loop, context):
700            raise ValueError('ham')
701
702        _context = None
703        loop.set_exception_handler(custom_handler)
704        with mock.patch('asyncio.base_events.logger') as log:
705            run_loop()
706            log.error.assert_called_with(
707                test_utils.MockPattern('Exception in default exception.*'
708                                       'while handling.*in custom'),
709                exc_info=True)
710
711            # Check that original context was passed to default
712            # exception handler.
713            self.assertIn('context', _context)
714            self.assertIs(type(_context['context']['exception']),
715                          ZeroDivisionError)
716
717    def test_set_task_factory_invalid(self):
718        with self.assertRaisesRegex(
719            TypeError, 'task factory must be a callable or None'):
720
721            self.loop.set_task_factory(1)
722
723        self.assertIsNone(self.loop.get_task_factory())
724
725    def test_set_task_factory(self):
726        self.loop._process_events = mock.Mock()
727
728        class MyTask(asyncio.Task):
729            pass
730
731        async def coro():
732            pass
733
734        factory = lambda loop, coro: MyTask(coro, loop=loop)
735
736        self.assertIsNone(self.loop.get_task_factory())
737        self.loop.set_task_factory(factory)
738        self.assertIs(self.loop.get_task_factory(), factory)
739
740        task = self.loop.create_task(coro())
741        self.assertTrue(isinstance(task, MyTask))
742        self.loop.run_until_complete(task)
743
744        self.loop.set_task_factory(None)
745        self.assertIsNone(self.loop.get_task_factory())
746
747        task = self.loop.create_task(coro())
748        self.assertTrue(isinstance(task, asyncio.Task))
749        self.assertFalse(isinstance(task, MyTask))
750        self.loop.run_until_complete(task)
751
752    def test_env_var_debug(self):
753        code = '\n'.join((
754            'import asyncio',
755            'loop = asyncio.get_event_loop()',
756            'print(loop.get_debug())'))
757
758        # Test with -E to not fail if the unit test was run with
759        # PYTHONASYNCIODEBUG set to a non-empty string
760        sts, stdout, stderr = assert_python_ok('-E', '-c', code)
761        self.assertEqual(stdout.rstrip(), b'False')
762
763        sts, stdout, stderr = assert_python_ok('-c', code,
764                                               PYTHONASYNCIODEBUG='',
765                                               PYTHONDEVMODE='')
766        self.assertEqual(stdout.rstrip(), b'False')
767
768        sts, stdout, stderr = assert_python_ok('-c', code,
769                                               PYTHONASYNCIODEBUG='1',
770                                               PYTHONDEVMODE='')
771        self.assertEqual(stdout.rstrip(), b'True')
772
773        sts, stdout, stderr = assert_python_ok('-E', '-c', code,
774                                               PYTHONASYNCIODEBUG='1')
775        self.assertEqual(stdout.rstrip(), b'False')
776
777        # -X dev
778        sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
779                                               '-c', code)
780        self.assertEqual(stdout.rstrip(), b'True')
781
782    def test_create_task(self):
783        class MyTask(asyncio.Task):
784            pass
785
786        async def test():
787            pass
788
789        class EventLoop(base_events.BaseEventLoop):
790            def create_task(self, coro):
791                return MyTask(coro, loop=loop)
792
793        loop = EventLoop()
794        self.set_event_loop(loop)
795
796        coro = test()
797        task = asyncio.ensure_future(coro, loop=loop)
798        self.assertIsInstance(task, MyTask)
799
800        # make warnings quiet
801        task._log_destroy_pending = False
802        coro.close()
803
804    def test_create_named_task_with_default_factory(self):
805        async def test():
806            pass
807
808        loop = asyncio.new_event_loop()
809        task = loop.create_task(test(), name='test_task')
810        try:
811            self.assertEqual(task.get_name(), 'test_task')
812        finally:
813            loop.run_until_complete(task)
814            loop.close()
815
816    def test_create_named_task_with_custom_factory(self):
817        def task_factory(loop, coro):
818            return asyncio.Task(coro, loop=loop)
819
820        async def test():
821            pass
822
823        loop = asyncio.new_event_loop()
824        loop.set_task_factory(task_factory)
825        task = loop.create_task(test(), name='test_task')
826        try:
827            self.assertEqual(task.get_name(), 'test_task')
828        finally:
829            loop.run_until_complete(task)
830            loop.close()
831
832    def test_run_forever_keyboard_interrupt(self):
833        # Python issue #22601: ensure that the temporary task created by
834        # run_forever() consumes the KeyboardInterrupt and so don't log
835        # a warning
836        async def raise_keyboard_interrupt():
837            raise KeyboardInterrupt
838
839        self.loop._process_events = mock.Mock()
840        self.loop.call_exception_handler = mock.Mock()
841
842        try:
843            self.loop.run_until_complete(raise_keyboard_interrupt())
844        except KeyboardInterrupt:
845            pass
846        self.loop.close()
847        support.gc_collect()
848
849        self.assertFalse(self.loop.call_exception_handler.called)
850
851    def test_run_until_complete_baseexception(self):
852        # Python issue #22429: run_until_complete() must not schedule a pending
853        # call to stop() if the future raised a BaseException
854        async def raise_keyboard_interrupt():
855            raise KeyboardInterrupt
856
857        self.loop._process_events = mock.Mock()
858
859        try:
860            self.loop.run_until_complete(raise_keyboard_interrupt())
861        except KeyboardInterrupt:
862            pass
863
864        def func():
865            self.loop.stop()
866            func.called = True
867        func.called = False
868        try:
869            self.loop.call_soon(func)
870            self.loop.run_forever()
871        except KeyboardInterrupt:
872            pass
873        self.assertTrue(func.called)
874
875    def test_single_selecter_event_callback_after_stopping(self):
876        # Python issue #25593: A stopped event loop may cause event callbacks
877        # to run more than once.
878        event_sentinel = object()
879        callcount = 0
880        doer = None
881
882        def proc_events(event_list):
883            nonlocal doer
884            if event_sentinel in event_list:
885                doer = self.loop.call_soon(do_event)
886
887        def do_event():
888            nonlocal callcount
889            callcount += 1
890            self.loop.call_soon(clear_selector)
891
892        def clear_selector():
893            doer.cancel()
894            self.loop._selector.select.return_value = ()
895
896        self.loop._process_events = proc_events
897        self.loop._selector.select.return_value = (event_sentinel,)
898
899        for i in range(1, 3):
900            with self.subTest('Loop %d/2' % i):
901                self.loop.call_soon(self.loop.stop)
902                self.loop.run_forever()
903                self.assertEqual(callcount, 1)
904
905    def test_run_once(self):
906        # Simple test for test_utils.run_once().  It may seem strange
907        # to have a test for this (the function isn't even used!) but
908        # it's a de-factor standard API for library tests.  This tests
909        # the idiom: loop.call_soon(loop.stop); loop.run_forever().
910        count = 0
911
912        def callback():
913            nonlocal count
914            count += 1
915
916        self.loop._process_events = mock.Mock()
917        self.loop.call_soon(callback)
918        test_utils.run_once(self.loop)
919        self.assertEqual(count, 1)
920
921    def test_run_forever_pre_stopped(self):
922        # Test that the old idiom for pre-stopping the loop works.
923        self.loop._process_events = mock.Mock()
924        self.loop.stop()
925        self.loop.run_forever()
926        self.loop._selector.select.assert_called_once_with(0)
927
928    async def leave_unfinalized_asyncgen(self):
929        # Create an async generator, iterate it partially, and leave it
930        # to be garbage collected.
931        # Used in async generator finalization tests.
932        # Depends on implementation details of garbage collector. Changes
933        # in gc may break this function.
934        status = {'started': False,
935                  'stopped': False,
936                  'finalized': False}
937
938        async def agen():
939            status['started'] = True
940            try:
941                for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']:
942                    yield item
943            finally:
944                status['finalized'] = True
945
946        ag = agen()
947        ai = ag.__aiter__()
948
949        async def iter_one():
950            try:
951                item = await ai.__anext__()
952            except StopAsyncIteration:
953                return
954            if item == 'THREE':
955                status['stopped'] = True
956                return
957            asyncio.create_task(iter_one())
958
959        asyncio.create_task(iter_one())
960        return status
961
962    def test_asyncgen_finalization_by_gc(self):
963        # Async generators should be finalized when garbage collected.
964        self.loop._process_events = mock.Mock()
965        self.loop._write_to_self = mock.Mock()
966        with support.disable_gc():
967            status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
968            while not status['stopped']:
969                test_utils.run_briefly(self.loop)
970            self.assertTrue(status['started'])
971            self.assertTrue(status['stopped'])
972            self.assertFalse(status['finalized'])
973            support.gc_collect()
974            test_utils.run_briefly(self.loop)
975            self.assertTrue(status['finalized'])
976
977    def test_asyncgen_finalization_by_gc_in_other_thread(self):
978        # Python issue 34769: If garbage collector runs in another
979        # thread, async generators will not finalize in debug
980        # mode.
981        self.loop._process_events = mock.Mock()
982        self.loop._write_to_self = mock.Mock()
983        self.loop.set_debug(True)
984        with support.disable_gc():
985            status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
986            while not status['stopped']:
987                test_utils.run_briefly(self.loop)
988            self.assertTrue(status['started'])
989            self.assertTrue(status['stopped'])
990            self.assertFalse(status['finalized'])
991            self.loop.run_until_complete(
992                self.loop.run_in_executor(None, support.gc_collect))
993            test_utils.run_briefly(self.loop)
994            self.assertTrue(status['finalized'])
995
996
997class MyProto(asyncio.Protocol):
998    done = None
999
1000    def __init__(self, create_future=False):
1001        self.state = 'INITIAL'
1002        self.nbytes = 0
1003        if create_future:
1004            self.done = asyncio.get_running_loop().create_future()
1005
1006    def _assert_state(self, *expected):
1007        if self.state not in expected:
1008            raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
1009
1010    def connection_made(self, transport):
1011        self.transport = transport
1012        self._assert_state('INITIAL')
1013        self.state = 'CONNECTED'
1014        transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
1015
1016    def data_received(self, data):
1017        self._assert_state('CONNECTED')
1018        self.nbytes += len(data)
1019
1020    def eof_received(self):
1021        self._assert_state('CONNECTED')
1022        self.state = 'EOF'
1023
1024    def connection_lost(self, exc):
1025        self._assert_state('CONNECTED', 'EOF')
1026        self.state = 'CLOSED'
1027        if self.done:
1028            self.done.set_result(None)
1029
1030
1031class MyDatagramProto(asyncio.DatagramProtocol):
1032    done = None
1033
1034    def __init__(self, create_future=False, loop=None):
1035        self.state = 'INITIAL'
1036        self.nbytes = 0
1037        if create_future:
1038            self.done = loop.create_future()
1039
1040    def _assert_state(self, expected):
1041        if self.state != expected:
1042            raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
1043
1044    def connection_made(self, transport):
1045        self.transport = transport
1046        self._assert_state('INITIAL')
1047        self.state = 'INITIALIZED'
1048
1049    def datagram_received(self, data, addr):
1050        self._assert_state('INITIALIZED')
1051        self.nbytes += len(data)
1052
1053    def error_received(self, exc):
1054        self._assert_state('INITIALIZED')
1055
1056    def connection_lost(self, exc):
1057        self._assert_state('INITIALIZED')
1058        self.state = 'CLOSED'
1059        if self.done:
1060            self.done.set_result(None)
1061
1062
1063class BaseEventLoopWithSelectorTests(test_utils.TestCase):
1064
1065    def setUp(self):
1066        super().setUp()
1067        self.loop = asyncio.SelectorEventLoop()
1068        self.set_event_loop(self.loop)
1069
1070    @mock.patch('socket.getnameinfo')
1071    def test_getnameinfo(self, m_gai):
1072        m_gai.side_effect = lambda *args: 42
1073        r = self.loop.run_until_complete(self.loop.getnameinfo(('abc', 123)))
1074        self.assertEqual(r, 42)
1075
1076    @patch_socket
1077    def test_create_connection_multiple_errors(self, m_socket):
1078
1079        class MyProto(asyncio.Protocol):
1080            pass
1081
1082        async def getaddrinfo(*args, **kw):
1083            return [(2, 1, 6, '', ('107.6.106.82', 80)),
1084                    (2, 1, 6, '', ('107.6.106.82', 80))]
1085
1086        def getaddrinfo_task(*args, **kwds):
1087            return self.loop.create_task(getaddrinfo(*args, **kwds))
1088
1089        idx = -1
1090        errors = ['err1', 'err2']
1091
1092        def _socket(*args, **kw):
1093            nonlocal idx, errors
1094            idx += 1
1095            raise OSError(errors[idx])
1096
1097        m_socket.socket = _socket
1098
1099        self.loop.getaddrinfo = getaddrinfo_task
1100
1101        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1102        with self.assertRaises(OSError) as cm:
1103            self.loop.run_until_complete(coro)
1104
1105        self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2')
1106
1107    @patch_socket
1108    def test_create_connection_timeout(self, m_socket):
1109        # Ensure that the socket is closed on timeout
1110        sock = mock.Mock()
1111        m_socket.socket.return_value = sock
1112
1113        def getaddrinfo(*args, **kw):
1114            fut = self.loop.create_future()
1115            addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '',
1116                    ('127.0.0.1', 80))
1117            fut.set_result([addr])
1118            return fut
1119        self.loop.getaddrinfo = getaddrinfo
1120
1121        with mock.patch.object(self.loop, 'sock_connect',
1122                               side_effect=asyncio.TimeoutError):
1123            coro = self.loop.create_connection(MyProto, '127.0.0.1', 80)
1124            with self.assertRaises(asyncio.TimeoutError):
1125                self.loop.run_until_complete(coro)
1126            self.assertTrue(sock.close.called)
1127
1128    def test_create_connection_host_port_sock(self):
1129        coro = self.loop.create_connection(
1130            MyProto, 'example.com', 80, sock=object())
1131        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1132
1133    def test_create_connection_wrong_sock(self):
1134        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1135        with sock:
1136            coro = self.loop.create_connection(MyProto, sock=sock)
1137            with self.assertRaisesRegex(ValueError,
1138                                        'A Stream Socket was expected'):
1139                self.loop.run_until_complete(coro)
1140
1141    def test_create_server_wrong_sock(self):
1142        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1143        with sock:
1144            coro = self.loop.create_server(MyProto, sock=sock)
1145            with self.assertRaisesRegex(ValueError,
1146                                        'A Stream Socket was expected'):
1147                self.loop.run_until_complete(coro)
1148
1149    def test_create_server_ssl_timeout_for_plain_socket(self):
1150        coro = self.loop.create_server(
1151            MyProto, 'example.com', 80, ssl_handshake_timeout=1)
1152        with self.assertRaisesRegex(
1153                ValueError,
1154                'ssl_handshake_timeout is only meaningful with ssl'):
1155            self.loop.run_until_complete(coro)
1156
1157    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
1158                         'no socket.SOCK_NONBLOCK (linux only)')
1159    def test_create_server_stream_bittype(self):
1160        sock = socket.socket(
1161            socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
1162        with sock:
1163            coro = self.loop.create_server(lambda: None, sock=sock)
1164            srv = self.loop.run_until_complete(coro)
1165            srv.close()
1166            self.loop.run_until_complete(srv.wait_closed())
1167
1168    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
1169    def test_create_server_ipv6(self):
1170        async def main():
1171            srv = await asyncio.start_server(lambda: None, '::1', 0)
1172            try:
1173                self.assertGreater(len(srv.sockets), 0)
1174            finally:
1175                srv.close()
1176                await srv.wait_closed()
1177
1178        try:
1179            self.loop.run_until_complete(main())
1180        except OSError as ex:
1181            if (hasattr(errno, 'EADDRNOTAVAIL') and
1182                    ex.errno == errno.EADDRNOTAVAIL):
1183                self.skipTest('failed to bind to ::1')
1184            else:
1185                raise
1186
1187    def test_create_datagram_endpoint_wrong_sock(self):
1188        sock = socket.socket(socket.AF_INET)
1189        with sock:
1190            coro = self.loop.create_datagram_endpoint(MyProto, sock=sock)
1191            with self.assertRaisesRegex(ValueError,
1192                                        'A UDP Socket was expected'):
1193                self.loop.run_until_complete(coro)
1194
1195    def test_create_connection_no_host_port_sock(self):
1196        coro = self.loop.create_connection(MyProto)
1197        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1198
1199    def test_create_connection_no_getaddrinfo(self):
1200        async def getaddrinfo(*args, **kw):
1201            return []
1202
1203        def getaddrinfo_task(*args, **kwds):
1204            return self.loop.create_task(getaddrinfo(*args, **kwds))
1205
1206        self.loop.getaddrinfo = getaddrinfo_task
1207        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1208        self.assertRaises(
1209            OSError, self.loop.run_until_complete, coro)
1210
1211    def test_create_connection_connect_err(self):
1212        async def getaddrinfo(*args, **kw):
1213            return [(2, 1, 6, '', ('107.6.106.82', 80))]
1214
1215        def getaddrinfo_task(*args, **kwds):
1216            return self.loop.create_task(getaddrinfo(*args, **kwds))
1217
1218        self.loop.getaddrinfo = getaddrinfo_task
1219        self.loop.sock_connect = mock.Mock()
1220        self.loop.sock_connect.side_effect = OSError
1221
1222        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1223        self.assertRaises(
1224            OSError, self.loop.run_until_complete, coro)
1225
1226    def test_create_connection_multiple(self):
1227        async def getaddrinfo(*args, **kw):
1228            return [(2, 1, 6, '', ('0.0.0.1', 80)),
1229                    (2, 1, 6, '', ('0.0.0.2', 80))]
1230
1231        def getaddrinfo_task(*args, **kwds):
1232            return self.loop.create_task(getaddrinfo(*args, **kwds))
1233
1234        self.loop.getaddrinfo = getaddrinfo_task
1235        self.loop.sock_connect = mock.Mock()
1236        self.loop.sock_connect.side_effect = OSError
1237
1238        coro = self.loop.create_connection(
1239            MyProto, 'example.com', 80, family=socket.AF_INET)
1240        with self.assertRaises(OSError):
1241            self.loop.run_until_complete(coro)
1242
1243    @patch_socket
1244    def test_create_connection_multiple_errors_local_addr(self, m_socket):
1245
1246        def bind(addr):
1247            if addr[0] == '0.0.0.1':
1248                err = OSError('Err')
1249                err.strerror = 'Err'
1250                raise err
1251
1252        m_socket.socket.return_value.bind = bind
1253
1254        async def getaddrinfo(*args, **kw):
1255            return [(2, 1, 6, '', ('0.0.0.1', 80)),
1256                    (2, 1, 6, '', ('0.0.0.2', 80))]
1257
1258        def getaddrinfo_task(*args, **kwds):
1259            return self.loop.create_task(getaddrinfo(*args, **kwds))
1260
1261        self.loop.getaddrinfo = getaddrinfo_task
1262        self.loop.sock_connect = mock.Mock()
1263        self.loop.sock_connect.side_effect = OSError('Err2')
1264
1265        coro = self.loop.create_connection(
1266            MyProto, 'example.com', 80, family=socket.AF_INET,
1267            local_addr=(None, 8080))
1268        with self.assertRaises(OSError) as cm:
1269            self.loop.run_until_complete(coro)
1270
1271        self.assertTrue(str(cm.exception).startswith('Multiple exceptions: '))
1272        self.assertTrue(m_socket.socket.return_value.close.called)
1273
1274    def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton):
1275        # Test the fallback code, even if this system has inet_pton.
1276        if not allow_inet_pton:
1277            del m_socket.inet_pton
1278
1279        m_socket.getaddrinfo = socket.getaddrinfo
1280        sock = m_socket.socket.return_value
1281
1282        self.loop._add_reader = mock.Mock()
1283        self.loop._add_reader._is_coroutine = False
1284        self.loop._add_writer = mock.Mock()
1285        self.loop._add_writer._is_coroutine = False
1286
1287        coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80)
1288        t, p = self.loop.run_until_complete(coro)
1289        try:
1290            sock.connect.assert_called_with(('1.2.3.4', 80))
1291            _, kwargs = m_socket.socket.call_args
1292            self.assertEqual(kwargs['family'], m_socket.AF_INET)
1293            self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1294        finally:
1295            t.close()
1296            test_utils.run_briefly(self.loop)  # allow transport to close
1297
1298        if socket_helper.IPV6_ENABLED:
1299            sock.family = socket.AF_INET6
1300            coro = self.loop.create_connection(asyncio.Protocol, '::1', 80)
1301            t, p = self.loop.run_until_complete(coro)
1302            try:
1303                # Without inet_pton we use getaddrinfo, which transforms
1304                # ('::1', 80) to ('::1', 80, 0, 0). The last 0s are flow info,
1305                # scope id.
1306                [address] = sock.connect.call_args[0]
1307                host, port = address[:2]
1308                self.assertRegex(host, r'::(0\.)*1')
1309                self.assertEqual(port, 80)
1310                _, kwargs = m_socket.socket.call_args
1311                self.assertEqual(kwargs['family'], m_socket.AF_INET6)
1312                self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1313            finally:
1314                t.close()
1315                test_utils.run_briefly(self.loop)  # allow transport to close
1316
1317    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
1318    @unittest.skipIf(sys.platform.startswith('aix'),
1319                    "bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX")
1320    @patch_socket
1321    def test_create_connection_ipv6_scope(self, m_socket):
1322        m_socket.getaddrinfo = socket.getaddrinfo
1323        sock = m_socket.socket.return_value
1324        sock.family = socket.AF_INET6
1325
1326        self.loop._add_reader = mock.Mock()
1327        self.loop._add_reader._is_coroutine = False
1328        self.loop._add_writer = mock.Mock()
1329        self.loop._add_writer._is_coroutine = False
1330
1331        coro = self.loop.create_connection(asyncio.Protocol, 'fe80::1%1', 80)
1332        t, p = self.loop.run_until_complete(coro)
1333        try:
1334            sock.connect.assert_called_with(('fe80::1', 80, 0, 1))
1335            _, kwargs = m_socket.socket.call_args
1336            self.assertEqual(kwargs['family'], m_socket.AF_INET6)
1337            self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1338        finally:
1339            t.close()
1340            test_utils.run_briefly(self.loop)  # allow transport to close
1341
1342    @patch_socket
1343    def test_create_connection_ip_addr(self, m_socket):
1344        self._test_create_connection_ip_addr(m_socket, True)
1345
1346    @patch_socket
1347    def test_create_connection_no_inet_pton(self, m_socket):
1348        self._test_create_connection_ip_addr(m_socket, False)
1349
1350    @patch_socket
1351    def test_create_connection_service_name(self, m_socket):
1352        m_socket.getaddrinfo = socket.getaddrinfo
1353        sock = m_socket.socket.return_value
1354
1355        self.loop._add_reader = mock.Mock()
1356        self.loop._add_reader._is_coroutine = False
1357        self.loop._add_writer = mock.Mock()
1358        self.loop._add_writer._is_coroutine = False
1359
1360        for service, port in ('http', 80), (b'http', 80):
1361            coro = self.loop.create_connection(asyncio.Protocol,
1362                                               '127.0.0.1', service)
1363
1364            t, p = self.loop.run_until_complete(coro)
1365            try:
1366                sock.connect.assert_called_with(('127.0.0.1', port))
1367                _, kwargs = m_socket.socket.call_args
1368                self.assertEqual(kwargs['family'], m_socket.AF_INET)
1369                self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1370            finally:
1371                t.close()
1372                test_utils.run_briefly(self.loop)  # allow transport to close
1373
1374        for service in 'nonsense', b'nonsense':
1375            coro = self.loop.create_connection(asyncio.Protocol,
1376                                               '127.0.0.1', service)
1377
1378            with self.assertRaises(OSError):
1379                self.loop.run_until_complete(coro)
1380
1381    def test_create_connection_no_local_addr(self):
1382        async def getaddrinfo(host, *args, **kw):
1383            if host == 'example.com':
1384                return [(2, 1, 6, '', ('107.6.106.82', 80)),
1385                        (2, 1, 6, '', ('107.6.106.82', 80))]
1386            else:
1387                return []
1388
1389        def getaddrinfo_task(*args, **kwds):
1390            return self.loop.create_task(getaddrinfo(*args, **kwds))
1391        self.loop.getaddrinfo = getaddrinfo_task
1392
1393        coro = self.loop.create_connection(
1394            MyProto, 'example.com', 80, family=socket.AF_INET,
1395            local_addr=(None, 8080))
1396        self.assertRaises(
1397            OSError, self.loop.run_until_complete, coro)
1398
1399    @patch_socket
1400    def test_create_connection_bluetooth(self, m_socket):
1401        # See http://bugs.python.org/issue27136, fallback to getaddrinfo when
1402        # we can't recognize an address is resolved, e.g. a Bluetooth address.
1403        addr = ('00:01:02:03:04:05', 1)
1404
1405        def getaddrinfo(host, port, *args, **kw):
1406            self.assertEqual((host, port), addr)
1407            return [(999, 1, 999, '', (addr, 1))]
1408
1409        m_socket.getaddrinfo = getaddrinfo
1410        sock = m_socket.socket()
1411        coro = self.loop.sock_connect(sock, addr)
1412        self.loop.run_until_complete(coro)
1413
1414    def test_create_connection_ssl_server_hostname_default(self):
1415        self.loop.getaddrinfo = mock.Mock()
1416
1417        def mock_getaddrinfo(*args, **kwds):
1418            f = self.loop.create_future()
1419            f.set_result([(socket.AF_INET, socket.SOCK_STREAM,
1420                           socket.SOL_TCP, '', ('1.2.3.4', 80))])
1421            return f
1422
1423        self.loop.getaddrinfo.side_effect = mock_getaddrinfo
1424        self.loop.sock_connect = mock.Mock()
1425        self.loop.sock_connect.return_value = self.loop.create_future()
1426        self.loop.sock_connect.return_value.set_result(None)
1427        self.loop._make_ssl_transport = mock.Mock()
1428
1429        class _SelectorTransportMock:
1430            _sock = None
1431
1432            def get_extra_info(self, key):
1433                return mock.Mock()
1434
1435            def close(self):
1436                self._sock.close()
1437
1438        def mock_make_ssl_transport(sock, protocol, sslcontext, waiter,
1439                                    **kwds):
1440            waiter.set_result(None)
1441            transport = _SelectorTransportMock()
1442            transport._sock = sock
1443            return transport
1444
1445        self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport
1446        ANY = mock.ANY
1447        handshake_timeout = object()
1448        # First try the default server_hostname.
1449        self.loop._make_ssl_transport.reset_mock()
1450        coro = self.loop.create_connection(
1451                MyProto, 'python.org', 80, ssl=True,
1452                ssl_handshake_timeout=handshake_timeout)
1453        transport, _ = self.loop.run_until_complete(coro)
1454        transport.close()
1455        self.loop._make_ssl_transport.assert_called_with(
1456            ANY, ANY, ANY, ANY,
1457            server_side=False,
1458            server_hostname='python.org',
1459            ssl_handshake_timeout=handshake_timeout)
1460        # Next try an explicit server_hostname.
1461        self.loop._make_ssl_transport.reset_mock()
1462        coro = self.loop.create_connection(
1463                MyProto, 'python.org', 80, ssl=True,
1464                server_hostname='perl.com',
1465                ssl_handshake_timeout=handshake_timeout)
1466        transport, _ = self.loop.run_until_complete(coro)
1467        transport.close()
1468        self.loop._make_ssl_transport.assert_called_with(
1469            ANY, ANY, ANY, ANY,
1470            server_side=False,
1471            server_hostname='perl.com',
1472            ssl_handshake_timeout=handshake_timeout)
1473        # Finally try an explicit empty server_hostname.
1474        self.loop._make_ssl_transport.reset_mock()
1475        coro = self.loop.create_connection(
1476                MyProto, 'python.org', 80, ssl=True,
1477                server_hostname='',
1478                ssl_handshake_timeout=handshake_timeout)
1479        transport, _ = self.loop.run_until_complete(coro)
1480        transport.close()
1481        self.loop._make_ssl_transport.assert_called_with(
1482                ANY, ANY, ANY, ANY,
1483                server_side=False,
1484                server_hostname='',
1485                ssl_handshake_timeout=handshake_timeout)
1486
1487    def test_create_connection_no_ssl_server_hostname_errors(self):
1488        # When not using ssl, server_hostname must be None.
1489        coro = self.loop.create_connection(MyProto, 'python.org', 80,
1490                                           server_hostname='')
1491        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1492        coro = self.loop.create_connection(MyProto, 'python.org', 80,
1493                                           server_hostname='python.org')
1494        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1495
1496    def test_create_connection_ssl_server_hostname_errors(self):
1497        # When using ssl, server_hostname may be None if host is non-empty.
1498        coro = self.loop.create_connection(MyProto, '', 80, ssl=True)
1499        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1500        coro = self.loop.create_connection(MyProto, None, 80, ssl=True)
1501        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1502        sock = socket.socket()
1503        coro = self.loop.create_connection(MyProto, None, None,
1504                                           ssl=True, sock=sock)
1505        self.addCleanup(sock.close)
1506        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1507
1508    def test_create_connection_ssl_timeout_for_plain_socket(self):
1509        coro = self.loop.create_connection(
1510            MyProto, 'example.com', 80, ssl_handshake_timeout=1)
1511        with self.assertRaisesRegex(
1512                ValueError,
1513                'ssl_handshake_timeout is only meaningful with ssl'):
1514            self.loop.run_until_complete(coro)
1515
1516    def test_create_server_empty_host(self):
1517        # if host is empty string use None instead
1518        host = object()
1519
1520        async def getaddrinfo(*args, **kw):
1521            nonlocal host
1522            host = args[0]
1523            return []
1524
1525        def getaddrinfo_task(*args, **kwds):
1526            return self.loop.create_task(getaddrinfo(*args, **kwds))
1527
1528        self.loop.getaddrinfo = getaddrinfo_task
1529        fut = self.loop.create_server(MyProto, '', 0)
1530        self.assertRaises(OSError, self.loop.run_until_complete, fut)
1531        self.assertIsNone(host)
1532
1533    def test_create_server_host_port_sock(self):
1534        fut = self.loop.create_server(
1535            MyProto, '0.0.0.0', 0, sock=object())
1536        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1537
1538    def test_create_server_no_host_port_sock(self):
1539        fut = self.loop.create_server(MyProto)
1540        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1541
1542    def test_create_server_no_getaddrinfo(self):
1543        getaddrinfo = self.loop.getaddrinfo = mock.Mock()
1544        getaddrinfo.return_value = self.loop.create_future()
1545        getaddrinfo.return_value.set_result(None)
1546
1547        f = self.loop.create_server(MyProto, 'python.org', 0)
1548        self.assertRaises(OSError, self.loop.run_until_complete, f)
1549
1550    @patch_socket
1551    def test_create_server_nosoreuseport(self, m_socket):
1552        m_socket.getaddrinfo = socket.getaddrinfo
1553        del m_socket.SO_REUSEPORT
1554        m_socket.socket.return_value = mock.Mock()
1555
1556        f = self.loop.create_server(
1557            MyProto, '0.0.0.0', 0, reuse_port=True)
1558
1559        self.assertRaises(ValueError, self.loop.run_until_complete, f)
1560
1561    @patch_socket
1562    def test_create_server_soreuseport_only_defined(self, m_socket):
1563        m_socket.getaddrinfo = socket.getaddrinfo
1564        m_socket.socket.return_value = mock.Mock()
1565        m_socket.SO_REUSEPORT = -1
1566
1567        f = self.loop.create_server(
1568            MyProto, '0.0.0.0', 0, reuse_port=True)
1569
1570        self.assertRaises(ValueError, self.loop.run_until_complete, f)
1571
1572    @patch_socket
1573    def test_create_server_cant_bind(self, m_socket):
1574
1575        class Err(OSError):
1576            strerror = 'error'
1577
1578        m_socket.getaddrinfo.return_value = [
1579            (2, 1, 6, '', ('127.0.0.1', 10100))]
1580        m_socket.getaddrinfo._is_coroutine = False
1581        m_sock = m_socket.socket.return_value = mock.Mock()
1582        m_sock.bind.side_effect = Err
1583
1584        fut = self.loop.create_server(MyProto, '0.0.0.0', 0)
1585        self.assertRaises(OSError, self.loop.run_until_complete, fut)
1586        self.assertTrue(m_sock.close.called)
1587
1588    @patch_socket
1589    def test_create_datagram_endpoint_no_addrinfo(self, m_socket):
1590        m_socket.getaddrinfo.return_value = []
1591        m_socket.getaddrinfo._is_coroutine = False
1592
1593        coro = self.loop.create_datagram_endpoint(
1594            MyDatagramProto, local_addr=('localhost', 0))
1595        self.assertRaises(
1596            OSError, self.loop.run_until_complete, coro)
1597
1598    def test_create_datagram_endpoint_addr_error(self):
1599        coro = self.loop.create_datagram_endpoint(
1600            MyDatagramProto, local_addr='localhost')
1601        self.assertRaises(
1602            AssertionError, self.loop.run_until_complete, coro)
1603        coro = self.loop.create_datagram_endpoint(
1604            MyDatagramProto, local_addr=('localhost', 1, 2, 3))
1605        self.assertRaises(
1606            AssertionError, self.loop.run_until_complete, coro)
1607
1608    def test_create_datagram_endpoint_connect_err(self):
1609        self.loop.sock_connect = mock.Mock()
1610        self.loop.sock_connect.side_effect = OSError
1611
1612        coro = self.loop.create_datagram_endpoint(
1613            asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0))
1614        self.assertRaises(
1615            OSError, self.loop.run_until_complete, coro)
1616
1617    def test_create_datagram_endpoint_allow_broadcast(self):
1618        protocol = MyDatagramProto(create_future=True, loop=self.loop)
1619        self.loop.sock_connect = sock_connect = mock.Mock()
1620        sock_connect.return_value = []
1621
1622        coro = self.loop.create_datagram_endpoint(
1623            lambda: protocol,
1624            remote_addr=('127.0.0.1', 0),
1625            allow_broadcast=True)
1626
1627        transport, _ = self.loop.run_until_complete(coro)
1628        self.assertFalse(sock_connect.called)
1629
1630        transport.close()
1631        self.loop.run_until_complete(protocol.done)
1632        self.assertEqual('CLOSED', protocol.state)
1633
1634    @patch_socket
1635    def test_create_datagram_endpoint_socket_err(self, m_socket):
1636        m_socket.getaddrinfo = socket.getaddrinfo
1637        m_socket.socket.side_effect = OSError
1638
1639        coro = self.loop.create_datagram_endpoint(
1640            asyncio.DatagramProtocol, family=socket.AF_INET)
1641        self.assertRaises(
1642            OSError, self.loop.run_until_complete, coro)
1643
1644        coro = self.loop.create_datagram_endpoint(
1645            asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0))
1646        self.assertRaises(
1647            OSError, self.loop.run_until_complete, coro)
1648
1649    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
1650    def test_create_datagram_endpoint_no_matching_family(self):
1651        coro = self.loop.create_datagram_endpoint(
1652            asyncio.DatagramProtocol,
1653            remote_addr=('127.0.0.1', 0), local_addr=('::1', 0))
1654        self.assertRaises(
1655            ValueError, self.loop.run_until_complete, coro)
1656
1657    @patch_socket
1658    def test_create_datagram_endpoint_setblk_err(self, m_socket):
1659        m_socket.socket.return_value.setblocking.side_effect = OSError
1660
1661        coro = self.loop.create_datagram_endpoint(
1662            asyncio.DatagramProtocol, family=socket.AF_INET)
1663        self.assertRaises(
1664            OSError, self.loop.run_until_complete, coro)
1665        self.assertTrue(
1666            m_socket.socket.return_value.close.called)
1667
1668    def test_create_datagram_endpoint_noaddr_nofamily(self):
1669        coro = self.loop.create_datagram_endpoint(
1670            asyncio.DatagramProtocol)
1671        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1672
1673    @patch_socket
1674    def test_create_datagram_endpoint_cant_bind(self, m_socket):
1675        class Err(OSError):
1676            pass
1677
1678        m_socket.getaddrinfo = socket.getaddrinfo
1679        m_sock = m_socket.socket.return_value = mock.Mock()
1680        m_sock.bind.side_effect = Err
1681
1682        fut = self.loop.create_datagram_endpoint(
1683            MyDatagramProto,
1684            local_addr=('127.0.0.1', 0), family=socket.AF_INET)
1685        self.assertRaises(Err, self.loop.run_until_complete, fut)
1686        self.assertTrue(m_sock.close.called)
1687
1688    def test_create_datagram_endpoint_sock(self):
1689        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1690        sock.bind(('127.0.0.1', 0))
1691        fut = self.loop.create_datagram_endpoint(
1692            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1693            sock=sock)
1694        transport, protocol = self.loop.run_until_complete(fut)
1695        transport.close()
1696        self.loop.run_until_complete(protocol.done)
1697        self.assertEqual('CLOSED', protocol.state)
1698
1699    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
1700    def test_create_datagram_endpoint_sock_unix(self):
1701        fut = self.loop.create_datagram_endpoint(
1702            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1703            family=socket.AF_UNIX)
1704        transport, protocol = self.loop.run_until_complete(fut)
1705        self.assertEqual(transport._sock.family, socket.AF_UNIX)
1706        transport.close()
1707        self.loop.run_until_complete(protocol.done)
1708        self.assertEqual('CLOSED', protocol.state)
1709
1710    @socket_helper.skip_unless_bind_unix_socket
1711    def test_create_datagram_endpoint_existing_sock_unix(self):
1712        with test_utils.unix_socket_path() as path:
1713            sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM)
1714            sock.bind(path)
1715            sock.close()
1716
1717            coro = self.loop.create_datagram_endpoint(
1718                lambda: MyDatagramProto(create_future=True, loop=self.loop),
1719                path, family=socket.AF_UNIX)
1720            transport, protocol = self.loop.run_until_complete(coro)
1721            transport.close()
1722            self.loop.run_until_complete(protocol.done)
1723
1724    def test_create_datagram_endpoint_sock_sockopts(self):
1725        class FakeSock:
1726            type = socket.SOCK_DGRAM
1727
1728        fut = self.loop.create_datagram_endpoint(
1729            MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock())
1730        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1731
1732        fut = self.loop.create_datagram_endpoint(
1733            MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock())
1734        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1735
1736        fut = self.loop.create_datagram_endpoint(
1737            MyDatagramProto, family=1, sock=FakeSock())
1738        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1739
1740        fut = self.loop.create_datagram_endpoint(
1741            MyDatagramProto, proto=1, sock=FakeSock())
1742        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1743
1744        fut = self.loop.create_datagram_endpoint(
1745            MyDatagramProto, flags=1, sock=FakeSock())
1746        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1747
1748        fut = self.loop.create_datagram_endpoint(
1749            MyDatagramProto, reuse_port=True, sock=FakeSock())
1750        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1751
1752        fut = self.loop.create_datagram_endpoint(
1753            MyDatagramProto, allow_broadcast=True, sock=FakeSock())
1754        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1755
1756    @unittest.skipIf(sys.platform == 'vxworks',
1757                    "SO_BROADCAST is enabled by default on VxWorks")
1758    def test_create_datagram_endpoint_sockopts(self):
1759        # Socket options should not be applied unless asked for.
1760        # SO_REUSEPORT is not available on all platforms.
1761
1762        coro = self.loop.create_datagram_endpoint(
1763            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1764            local_addr=('127.0.0.1', 0))
1765        transport, protocol = self.loop.run_until_complete(coro)
1766        sock = transport.get_extra_info('socket')
1767
1768        reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
1769
1770        if reuseport_supported:
1771            self.assertFalse(
1772                sock.getsockopt(
1773                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
1774        self.assertFalse(
1775            sock.getsockopt(
1776                socket.SOL_SOCKET, socket.SO_BROADCAST))
1777
1778        transport.close()
1779        self.loop.run_until_complete(protocol.done)
1780        self.assertEqual('CLOSED', protocol.state)
1781
1782        coro = self.loop.create_datagram_endpoint(
1783            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1784            local_addr=('127.0.0.1', 0),
1785            reuse_port=reuseport_supported,
1786            allow_broadcast=True)
1787        transport, protocol = self.loop.run_until_complete(coro)
1788        sock = transport.get_extra_info('socket')
1789
1790        self.assertFalse(
1791            sock.getsockopt(
1792                socket.SOL_SOCKET, socket.SO_REUSEADDR))
1793        if reuseport_supported:
1794            self.assertTrue(
1795                sock.getsockopt(
1796                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
1797        self.assertTrue(
1798            sock.getsockopt(
1799                socket.SOL_SOCKET, socket.SO_BROADCAST))
1800
1801        transport.close()
1802        self.loop.run_until_complete(protocol.done)
1803        self.assertEqual('CLOSED', protocol.state)
1804
1805    def test_create_datagram_endpoint_reuse_address_error(self):
1806        # bpo-37228: Ensure that explicit passing of `reuse_address=True`
1807        # raises an error, as it is not safe to use SO_REUSEADDR when using UDP
1808
1809        coro = self.loop.create_datagram_endpoint(
1810            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1811            local_addr=('127.0.0.1', 0),
1812            reuse_address=True)
1813
1814        with self.assertRaises(ValueError):
1815            self.loop.run_until_complete(coro)
1816
1817    def test_create_datagram_endpoint_reuse_address_warning(self):
1818        # bpo-37228: Deprecate *reuse_address* parameter
1819
1820        coro = self.loop.create_datagram_endpoint(
1821            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1822            local_addr=('127.0.0.1', 0),
1823            reuse_address=False)
1824
1825        with self.assertWarns(DeprecationWarning):
1826            transport, protocol = self.loop.run_until_complete(coro)
1827            transport.close()
1828            self.loop.run_until_complete(protocol.done)
1829            self.assertEqual('CLOSED', protocol.state)
1830
1831    @patch_socket
1832    def test_create_datagram_endpoint_nosoreuseport(self, m_socket):
1833        del m_socket.SO_REUSEPORT
1834        m_socket.socket.return_value = mock.Mock()
1835
1836        coro = self.loop.create_datagram_endpoint(
1837            lambda: MyDatagramProto(loop=self.loop),
1838            local_addr=('127.0.0.1', 0),
1839            reuse_port=True)
1840
1841        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1842
1843    @patch_socket
1844    def test_create_datagram_endpoint_ip_addr(self, m_socket):
1845        def getaddrinfo(*args, **kw):
1846            self.fail('should not have called getaddrinfo')
1847
1848        m_socket.getaddrinfo = getaddrinfo
1849        m_socket.socket.return_value.bind = bind = mock.Mock()
1850        self.loop._add_reader = mock.Mock()
1851        self.loop._add_reader._is_coroutine = False
1852
1853        reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
1854        coro = self.loop.create_datagram_endpoint(
1855            lambda: MyDatagramProto(loop=self.loop),
1856            local_addr=('1.2.3.4', 0),
1857            reuse_port=reuseport_supported)
1858
1859        t, p = self.loop.run_until_complete(coro)
1860        try:
1861            bind.assert_called_with(('1.2.3.4', 0))
1862            m_socket.socket.assert_called_with(family=m_socket.AF_INET,
1863                                               proto=m_socket.IPPROTO_UDP,
1864                                               type=m_socket.SOCK_DGRAM)
1865        finally:
1866            t.close()
1867            test_utils.run_briefly(self.loop)  # allow transport to close
1868
1869    def test_accept_connection_retry(self):
1870        sock = mock.Mock()
1871        sock.accept.side_effect = BlockingIOError()
1872
1873        self.loop._accept_connection(MyProto, sock)
1874        self.assertFalse(sock.close.called)
1875
1876    @mock.patch('asyncio.base_events.logger')
1877    def test_accept_connection_exception(self, m_log):
1878        sock = mock.Mock()
1879        sock.fileno.return_value = 10
1880        sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
1881        self.loop._remove_reader = mock.Mock()
1882        self.loop.call_later = mock.Mock()
1883
1884        self.loop._accept_connection(MyProto, sock)
1885        self.assertTrue(m_log.error.called)
1886        self.assertFalse(sock.close.called)
1887        self.loop._remove_reader.assert_called_with(10)
1888        self.loop.call_later.assert_called_with(
1889            constants.ACCEPT_RETRY_DELAY,
1890            # self.loop._start_serving
1891            mock.ANY,
1892            MyProto, sock, None, None, mock.ANY, mock.ANY)
1893
1894    def test_call_coroutine(self):
1895        with self.assertWarns(DeprecationWarning):
1896            @asyncio.coroutine
1897            def simple_coroutine():
1898                pass
1899
1900        self.loop.set_debug(True)
1901        coro_func = simple_coroutine
1902        coro_obj = coro_func()
1903        self.addCleanup(coro_obj.close)
1904        for func in (coro_func, coro_obj):
1905            with self.assertRaises(TypeError):
1906                self.loop.call_soon(func)
1907            with self.assertRaises(TypeError):
1908                self.loop.call_soon_threadsafe(func)
1909            with self.assertRaises(TypeError):
1910                self.loop.call_later(60, func)
1911            with self.assertRaises(TypeError):
1912                self.loop.call_at(self.loop.time() + 60, func)
1913            with self.assertRaises(TypeError):
1914                self.loop.run_until_complete(
1915                    self.loop.run_in_executor(None, func))
1916
1917    @mock.patch('asyncio.base_events.logger')
1918    def test_log_slow_callbacks(self, m_logger):
1919        def stop_loop_cb(loop):
1920            loop.stop()
1921
1922        async def stop_loop_coro(loop):
1923            loop.stop()
1924
1925        asyncio.set_event_loop(self.loop)
1926        self.loop.set_debug(True)
1927        self.loop.slow_callback_duration = 0.0
1928
1929        # slow callback
1930        self.loop.call_soon(stop_loop_cb, self.loop)
1931        self.loop.run_forever()
1932        fmt, *args = m_logger.warning.call_args[0]
1933        self.assertRegex(fmt % tuple(args),
1934                         "^Executing <Handle.*stop_loop_cb.*> "
1935                         "took .* seconds$")
1936
1937        # slow task
1938        asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop)
1939        self.loop.run_forever()
1940        fmt, *args = m_logger.warning.call_args[0]
1941        self.assertRegex(fmt % tuple(args),
1942                         "^Executing <Task.*stop_loop_coro.*> "
1943                         "took .* seconds$")
1944
1945
1946class RunningLoopTests(unittest.TestCase):
1947
1948    def test_running_loop_within_a_loop(self):
1949        async def runner(loop):
1950            loop.run_forever()
1951
1952        loop = asyncio.new_event_loop()
1953        outer_loop = asyncio.new_event_loop()
1954        try:
1955            with self.assertRaisesRegex(RuntimeError,
1956                                        'while another loop is running'):
1957                outer_loop.run_until_complete(runner(loop))
1958        finally:
1959            loop.close()
1960            outer_loop.close()
1961
1962
1963class BaseLoopSockSendfileTests(test_utils.TestCase):
1964
1965    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
1966
1967    class MyProto(asyncio.Protocol):
1968
1969        def __init__(self, loop):
1970            self.started = False
1971            self.closed = False
1972            self.data = bytearray()
1973            self.fut = loop.create_future()
1974            self.transport = None
1975
1976        def connection_made(self, transport):
1977            self.started = True
1978            self.transport = transport
1979
1980        def data_received(self, data):
1981            self.data.extend(data)
1982
1983        def connection_lost(self, exc):
1984            self.closed = True
1985            self.fut.set_result(None)
1986            self.transport = None
1987
1988        async def wait_closed(self):
1989            await self.fut
1990
1991    @classmethod
1992    def setUpClass(cls):
1993        cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE
1994        constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16
1995        with open(os_helper.TESTFN, 'wb') as fp:
1996            fp.write(cls.DATA)
1997        super().setUpClass()
1998
1999    @classmethod
2000    def tearDownClass(cls):
2001        constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize
2002        os_helper.unlink(os_helper.TESTFN)
2003        super().tearDownClass()
2004
2005    def setUp(self):
2006        from asyncio.selector_events import BaseSelectorEventLoop
2007        # BaseSelectorEventLoop() has no native implementation
2008        self.loop = BaseSelectorEventLoop()
2009        self.set_event_loop(self.loop)
2010        self.file = open(os_helper.TESTFN, 'rb')
2011        self.addCleanup(self.file.close)
2012        super().setUp()
2013
2014    def make_socket(self, blocking=False):
2015        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2016        sock.setblocking(blocking)
2017        self.addCleanup(sock.close)
2018        return sock
2019
2020    def run_loop(self, coro):
2021        return self.loop.run_until_complete(coro)
2022
2023    def prepare(self):
2024        sock = self.make_socket()
2025        proto = self.MyProto(self.loop)
2026        server = self.run_loop(self.loop.create_server(
2027            lambda: proto, socket_helper.HOST, 0, family=socket.AF_INET))
2028        addr = server.sockets[0].getsockname()
2029
2030        for _ in range(10):
2031            try:
2032                self.run_loop(self.loop.sock_connect(sock, addr))
2033            except OSError:
2034                self.run_loop(asyncio.sleep(0.5))
2035                continue
2036            else:
2037                break
2038        else:
2039            # One last try, so we get the exception
2040            self.run_loop(self.loop.sock_connect(sock, addr))
2041
2042        def cleanup():
2043            server.close()
2044            self.run_loop(server.wait_closed())
2045            sock.close()
2046            if proto.transport is not None:
2047                proto.transport.close()
2048                self.run_loop(proto.wait_closed())
2049
2050        self.addCleanup(cleanup)
2051
2052        return sock, proto
2053
2054    def test__sock_sendfile_native_failure(self):
2055        sock, proto = self.prepare()
2056
2057        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
2058                                    "sendfile is not available"):
2059            self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
2060                                                          0, None))
2061
2062        self.assertEqual(proto.data, b'')
2063        self.assertEqual(self.file.tell(), 0)
2064
2065    def test_sock_sendfile_no_fallback(self):
2066        sock, proto = self.prepare()
2067
2068        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
2069                                    "sendfile is not available"):
2070            self.run_loop(self.loop.sock_sendfile(sock, self.file,
2071                                                  fallback=False))
2072
2073        self.assertEqual(self.file.tell(), 0)
2074        self.assertEqual(proto.data, b'')
2075
2076    def test_sock_sendfile_fallback(self):
2077        sock, proto = self.prepare()
2078
2079        ret = self.run_loop(self.loop.sock_sendfile(sock, self.file))
2080        sock.close()
2081        self.run_loop(proto.wait_closed())
2082
2083        self.assertEqual(ret, len(self.DATA))
2084        self.assertEqual(self.file.tell(), len(self.DATA))
2085        self.assertEqual(proto.data, self.DATA)
2086
2087    def test_sock_sendfile_fallback_offset_and_count(self):
2088        sock, proto = self.prepare()
2089
2090        ret = self.run_loop(self.loop.sock_sendfile(sock, self.file,
2091                                                    1000, 2000))
2092        sock.close()
2093        self.run_loop(proto.wait_closed())
2094
2095        self.assertEqual(ret, 2000)
2096        self.assertEqual(self.file.tell(), 3000)
2097        self.assertEqual(proto.data, self.DATA[1000:3000])
2098
2099    def test_blocking_socket(self):
2100        self.loop.set_debug(True)
2101        sock = self.make_socket(blocking=True)
2102        with self.assertRaisesRegex(ValueError, "must be non-blocking"):
2103            self.run_loop(self.loop.sock_sendfile(sock, self.file))
2104
2105    def test_nonbinary_file(self):
2106        sock = self.make_socket()
2107        with open(os_helper.TESTFN, encoding="utf-8") as f:
2108            with self.assertRaisesRegex(ValueError, "binary mode"):
2109                self.run_loop(self.loop.sock_sendfile(sock, f))
2110
2111    def test_nonstream_socket(self):
2112        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
2113        sock.setblocking(False)
2114        self.addCleanup(sock.close)
2115        with self.assertRaisesRegex(ValueError, "only SOCK_STREAM type"):
2116            self.run_loop(self.loop.sock_sendfile(sock, self.file))
2117
2118    def test_notint_count(self):
2119        sock = self.make_socket()
2120        with self.assertRaisesRegex(TypeError,
2121                                    "count must be a positive integer"):
2122            self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, 'count'))
2123
2124    def test_negative_count(self):
2125        sock = self.make_socket()
2126        with self.assertRaisesRegex(ValueError,
2127                                    "count must be a positive integer"):
2128            self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, -1))
2129
2130    def test_notint_offset(self):
2131        sock = self.make_socket()
2132        with self.assertRaisesRegex(TypeError,
2133                                    "offset must be a non-negative integer"):
2134            self.run_loop(self.loop.sock_sendfile(sock, self.file, 'offset'))
2135
2136    def test_negative_offset(self):
2137        sock = self.make_socket()
2138        with self.assertRaisesRegex(ValueError,
2139                                    "offset must be a non-negative integer"):
2140            self.run_loop(self.loop.sock_sendfile(sock, self.file, -1))
2141
2142
2143class TestSelectorUtils(test_utils.TestCase):
2144    def check_set_nodelay(self, sock):
2145        opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
2146        self.assertFalse(opt)
2147
2148        base_events._set_nodelay(sock)
2149
2150        opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
2151        self.assertTrue(opt)
2152
2153    @unittest.skipUnless(hasattr(socket, 'TCP_NODELAY'),
2154                         'need socket.TCP_NODELAY')
2155    def test_set_nodelay(self):
2156        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM,
2157                             proto=socket.IPPROTO_TCP)
2158        with sock:
2159            self.check_set_nodelay(sock)
2160
2161        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM,
2162                             proto=socket.IPPROTO_TCP)
2163        with sock:
2164            sock.setblocking(False)
2165            self.check_set_nodelay(sock)
2166
2167
2168
2169if __name__ == '__main__':
2170    unittest.main()
2171