• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for base_events.py"""
2
3import concurrent.futures
4import errno
5import math
6import os
7import socket
8import sys
9import threading
10import time
11import unittest
12from unittest import mock
13
14import asyncio
15from asyncio import base_events
16from asyncio import constants
17from test.test_asyncio import utils as test_utils
18from test import support
19from test.support.script_helper import assert_python_ok
20
21
22MOCK_ANY = mock.ANY
23PY34 = sys.version_info >= (3, 4)
24
25
26def tearDownModule():
27    asyncio.set_event_loop_policy(None)
28
29
30def mock_socket_module():
31    m_socket = mock.MagicMock(spec=socket)
32    for name in (
33        'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP',
34        'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton'
35    ):
36        if hasattr(socket, name):
37            setattr(m_socket, name, getattr(socket, name))
38        else:
39            delattr(m_socket, name)
40
41    m_socket.socket = mock.MagicMock()
42    m_socket.socket.return_value = test_utils.mock_nonblocking_socket()
43    m_socket.getaddrinfo._is_coroutine = False
44
45    return m_socket
46
47
48def patch_socket(f):
49    return mock.patch('asyncio.base_events.socket',
50                      new_callable=mock_socket_module)(f)
51
52
53class BaseEventTests(test_utils.TestCase):
54
55    def test_ipaddr_info(self):
56        UNSPEC = socket.AF_UNSPEC
57        INET = socket.AF_INET
58        INET6 = socket.AF_INET6
59        STREAM = socket.SOCK_STREAM
60        DGRAM = socket.SOCK_DGRAM
61        TCP = socket.IPPROTO_TCP
62        UDP = socket.IPPROTO_UDP
63
64        self.assertEqual(
65            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
66            base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP))
67
68        self.assertEqual(
69            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
70            base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP))
71
72        self.assertEqual(
73            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
74            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP))
75
76        self.assertEqual(
77            (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
78            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP))
79
80        # Socket type STREAM implies TCP protocol.
81        self.assertEqual(
82            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
83            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0))
84
85        # Socket type DGRAM implies UDP protocol.
86        self.assertEqual(
87            (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
88            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0))
89
90        # No socket type.
91        self.assertIsNone(
92            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0))
93
94        if support.IPV6_ENABLED:
95            # IPv4 address with family IPv6.
96            self.assertIsNone(
97                base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP))
98
99            self.assertEqual(
100                (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)),
101                base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP))
102
103            self.assertEqual(
104                (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)),
105                base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP))
106
107            # IPv6 address with family IPv4.
108            self.assertIsNone(
109                base_events._ipaddr_info('::3', 1, INET, STREAM, TCP))
110
111            # IPv6 address with zone index.
112            self.assertIsNone(
113                base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP))
114
115    def test_port_parameter_types(self):
116        # Test obscure kinds of arguments for "port".
117        INET = socket.AF_INET
118        STREAM = socket.SOCK_STREAM
119        TCP = socket.IPPROTO_TCP
120
121        self.assertEqual(
122            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
123            base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP))
124
125        self.assertEqual(
126            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
127            base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP))
128
129        self.assertEqual(
130            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
131            base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP))
132
133        self.assertEqual(
134            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
135            base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP))
136
137        self.assertEqual(
138            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
139            base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP))
140
141    @patch_socket
142    def test_ipaddr_info_no_inet_pton(self, m_socket):
143        del m_socket.inet_pton
144        self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1,
145                                                   socket.AF_INET,
146                                                   socket.SOCK_STREAM,
147                                                   socket.IPPROTO_TCP))
148
149
150class BaseEventLoopTests(test_utils.TestCase):
151
152    def setUp(self):
153        super().setUp()
154        self.loop = base_events.BaseEventLoop()
155        self.loop._selector = mock.Mock()
156        self.loop._selector.select.return_value = ()
157        self.set_event_loop(self.loop)
158
159    def test_not_implemented(self):
160        m = mock.Mock()
161        self.assertRaises(
162            NotImplementedError,
163            self.loop._make_socket_transport, m, m)
164        self.assertRaises(
165            NotImplementedError,
166            self.loop._make_ssl_transport, m, m, m, m)
167        self.assertRaises(
168            NotImplementedError,
169            self.loop._make_datagram_transport, m, m)
170        self.assertRaises(
171            NotImplementedError, self.loop._process_events, [])
172        self.assertRaises(
173            NotImplementedError, self.loop._write_to_self)
174        self.assertRaises(
175            NotImplementedError,
176            self.loop._make_read_pipe_transport, m, m)
177        self.assertRaises(
178            NotImplementedError,
179            self.loop._make_write_pipe_transport, m, m)
180        gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m)
181        with self.assertRaises(NotImplementedError):
182            gen.send(None)
183
184    def test_close(self):
185        self.assertFalse(self.loop.is_closed())
186        self.loop.close()
187        self.assertTrue(self.loop.is_closed())
188
189        # it should be possible to call close() more than once
190        self.loop.close()
191        self.loop.close()
192
193        # operation blocked when the loop is closed
194        f = self.loop.create_future()
195        self.assertRaises(RuntimeError, self.loop.run_forever)
196        self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
197
198    def test__add_callback_handle(self):
199        h = asyncio.Handle(lambda: False, (), self.loop, None)
200
201        self.loop._add_callback(h)
202        self.assertFalse(self.loop._scheduled)
203        self.assertIn(h, self.loop._ready)
204
205    def test__add_callback_cancelled_handle(self):
206        h = asyncio.Handle(lambda: False, (), self.loop, None)
207        h.cancel()
208
209        self.loop._add_callback(h)
210        self.assertFalse(self.loop._scheduled)
211        self.assertFalse(self.loop._ready)
212
213    def test_set_default_executor(self):
214        class DummyExecutor(concurrent.futures.ThreadPoolExecutor):
215            def submit(self, fn, *args, **kwargs):
216                raise NotImplementedError(
217                    'cannot submit into a dummy executor')
218
219        executor = DummyExecutor()
220        self.loop.set_default_executor(executor)
221        self.assertIs(executor, self.loop._default_executor)
222
223    def test_set_default_executor_deprecation_warnings(self):
224        executor = mock.Mock()
225
226        with self.assertWarns(DeprecationWarning):
227            self.loop.set_default_executor(executor)
228
229    def test_call_soon(self):
230        def cb():
231            pass
232
233        h = self.loop.call_soon(cb)
234        self.assertEqual(h._callback, cb)
235        self.assertIsInstance(h, asyncio.Handle)
236        self.assertIn(h, self.loop._ready)
237
238    def test_call_soon_non_callable(self):
239        self.loop.set_debug(True)
240        with self.assertRaisesRegex(TypeError, 'a callable object'):
241            self.loop.call_soon(1)
242
243    def test_call_later(self):
244        def cb():
245            pass
246
247        h = self.loop.call_later(10.0, cb)
248        self.assertIsInstance(h, asyncio.TimerHandle)
249        self.assertIn(h, self.loop._scheduled)
250        self.assertNotIn(h, self.loop._ready)
251
252    def test_call_later_negative_delays(self):
253        calls = []
254
255        def cb(arg):
256            calls.append(arg)
257
258        self.loop._process_events = mock.Mock()
259        self.loop.call_later(-1, cb, 'a')
260        self.loop.call_later(-2, cb, 'b')
261        test_utils.run_briefly(self.loop)
262        self.assertEqual(calls, ['b', 'a'])
263
264    def test_time_and_call_at(self):
265        def cb():
266            self.loop.stop()
267
268        self.loop._process_events = mock.Mock()
269        delay = 0.1
270
271        when = self.loop.time() + delay
272        self.loop.call_at(when, cb)
273        t0 = self.loop.time()
274        self.loop.run_forever()
275        dt = self.loop.time() - t0
276
277        # 50 ms: maximum granularity of the event loop
278        self.assertGreaterEqual(dt, delay - 0.050, dt)
279        # tolerate a difference of +800 ms because some Python buildbots
280        # are really slow
281        self.assertLessEqual(dt, 0.9, dt)
282
283    def check_thread(self, loop, debug):
284        def cb():
285            pass
286
287        loop.set_debug(debug)
288        if debug:
289            msg = ("Non-thread-safe operation invoked on an event loop other "
290                   "than the current one")
291            with self.assertRaisesRegex(RuntimeError, msg):
292                loop.call_soon(cb)
293            with self.assertRaisesRegex(RuntimeError, msg):
294                loop.call_later(60, cb)
295            with self.assertRaisesRegex(RuntimeError, msg):
296                loop.call_at(loop.time() + 60, cb)
297        else:
298            loop.call_soon(cb)
299            loop.call_later(60, cb)
300            loop.call_at(loop.time() + 60, cb)
301
302    def test_check_thread(self):
303        def check_in_thread(loop, event, debug, create_loop, fut):
304            # wait until the event loop is running
305            event.wait()
306
307            try:
308                if create_loop:
309                    loop2 = base_events.BaseEventLoop()
310                    try:
311                        asyncio.set_event_loop(loop2)
312                        self.check_thread(loop, debug)
313                    finally:
314                        asyncio.set_event_loop(None)
315                        loop2.close()
316                else:
317                    self.check_thread(loop, debug)
318            except Exception as exc:
319                loop.call_soon_threadsafe(fut.set_exception, exc)
320            else:
321                loop.call_soon_threadsafe(fut.set_result, None)
322
323        def test_thread(loop, debug, create_loop=False):
324            event = threading.Event()
325            fut = loop.create_future()
326            loop.call_soon(event.set)
327            args = (loop, event, debug, create_loop, fut)
328            thread = threading.Thread(target=check_in_thread, args=args)
329            thread.start()
330            loop.run_until_complete(fut)
331            thread.join()
332
333        self.loop._process_events = mock.Mock()
334        self.loop._write_to_self = mock.Mock()
335
336        # raise RuntimeError if the thread has no event loop
337        test_thread(self.loop, True)
338
339        # check disabled if debug mode is disabled
340        test_thread(self.loop, False)
341
342        # raise RuntimeError if the event loop of the thread is not the called
343        # event loop
344        test_thread(self.loop, True, create_loop=True)
345
346        # check disabled if debug mode is disabled
347        test_thread(self.loop, False, create_loop=True)
348
349    def test__run_once(self):
350        h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (),
351                                 self.loop, None)
352        h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (),
353                                 self.loop, None)
354
355        h1.cancel()
356
357        self.loop._process_events = mock.Mock()
358        self.loop._scheduled.append(h1)
359        self.loop._scheduled.append(h2)
360        self.loop._run_once()
361
362        t = self.loop._selector.select.call_args[0][0]
363        self.assertTrue(9.5 < t < 10.5, t)
364        self.assertEqual([h2], self.loop._scheduled)
365        self.assertTrue(self.loop._process_events.called)
366
367    def test_set_debug(self):
368        self.loop.set_debug(True)
369        self.assertTrue(self.loop.get_debug())
370        self.loop.set_debug(False)
371        self.assertFalse(self.loop.get_debug())
372
373    def test__run_once_schedule_handle(self):
374        handle = None
375        processed = False
376
377        def cb(loop):
378            nonlocal processed, handle
379            processed = True
380            handle = loop.call_soon(lambda: True)
381
382        h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
383                                self.loop, None)
384
385        self.loop._process_events = mock.Mock()
386        self.loop._scheduled.append(h)
387        self.loop._run_once()
388
389        self.assertTrue(processed)
390        self.assertEqual([handle], list(self.loop._ready))
391
392    def test__run_once_cancelled_event_cleanup(self):
393        self.loop._process_events = mock.Mock()
394
395        self.assertTrue(
396            0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0)
397
398        def cb():
399            pass
400
401        # Set up one "blocking" event that will not be cancelled to
402        # ensure later cancelled events do not make it to the head
403        # of the queue and get cleaned.
404        not_cancelled_count = 1
405        self.loop.call_later(3000, cb)
406
407        # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES)
408        # cancelled handles, ensure they aren't removed
409
410        cancelled_count = 2
411        for x in range(2):
412            h = self.loop.call_later(3600, cb)
413            h.cancel()
414
415        # Add some cancelled events that will be at head and removed
416        cancelled_count += 2
417        for x in range(2):
418            h = self.loop.call_later(100, cb)
419            h.cancel()
420
421        # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low
422        self.assertLessEqual(cancelled_count + not_cancelled_count,
423            base_events._MIN_SCHEDULED_TIMER_HANDLES)
424
425        self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
426
427        self.loop._run_once()
428
429        cancelled_count -= 2
430
431        self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
432
433        self.assertEqual(len(self.loop._scheduled),
434            cancelled_count + not_cancelled_count)
435
436        # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION
437        # so that deletion of cancelled events will occur on next _run_once
438        add_cancel_count = int(math.ceil(
439            base_events._MIN_SCHEDULED_TIMER_HANDLES *
440            base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1
441
442        add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES -
443            add_cancel_count, 0)
444
445        # Add some events that will not be cancelled
446        not_cancelled_count += add_not_cancel_count
447        for x in range(add_not_cancel_count):
448            self.loop.call_later(3600, cb)
449
450        # Add enough cancelled events
451        cancelled_count += add_cancel_count
452        for x in range(add_cancel_count):
453            h = self.loop.call_later(3600, cb)
454            h.cancel()
455
456        # Ensure all handles are still scheduled
457        self.assertEqual(len(self.loop._scheduled),
458            cancelled_count + not_cancelled_count)
459
460        self.loop._run_once()
461
462        # Ensure cancelled events were removed
463        self.assertEqual(len(self.loop._scheduled), not_cancelled_count)
464
465        # Ensure only uncancelled events remain scheduled
466        self.assertTrue(all([not x._cancelled for x in self.loop._scheduled]))
467
468    def test_run_until_complete_type_error(self):
469        self.assertRaises(TypeError,
470            self.loop.run_until_complete, 'blah')
471
472    def test_run_until_complete_loop(self):
473        task = self.loop.create_future()
474        other_loop = self.new_test_loop()
475        self.addCleanup(other_loop.close)
476        self.assertRaises(ValueError,
477            other_loop.run_until_complete, task)
478
479    def test_run_until_complete_loop_orphan_future_close_loop(self):
480        class ShowStopper(SystemExit):
481            pass
482
483        async def foo(delay):
484            await asyncio.sleep(delay)
485
486        def throw():
487            raise ShowStopper
488
489        self.loop._process_events = mock.Mock()
490        self.loop.call_soon(throw)
491        with self.assertRaises(ShowStopper):
492            self.loop.run_until_complete(foo(0.1))
493
494        # This call fails if run_until_complete does not clean up
495        # done-callback for the previous future.
496        self.loop.run_until_complete(foo(0.2))
497
498    def test_subprocess_exec_invalid_args(self):
499        args = [sys.executable, '-c', 'pass']
500
501        # missing program parameter (empty args)
502        self.assertRaises(TypeError,
503            self.loop.run_until_complete, self.loop.subprocess_exec,
504            asyncio.SubprocessProtocol)
505
506        # expected multiple arguments, not a list
507        self.assertRaises(TypeError,
508            self.loop.run_until_complete, self.loop.subprocess_exec,
509            asyncio.SubprocessProtocol, args)
510
511        # program arguments must be strings, not int
512        self.assertRaises(TypeError,
513            self.loop.run_until_complete, self.loop.subprocess_exec,
514            asyncio.SubprocessProtocol, sys.executable, 123)
515
516        # universal_newlines, shell, bufsize must not be set
517        self.assertRaises(TypeError,
518        self.loop.run_until_complete, self.loop.subprocess_exec,
519            asyncio.SubprocessProtocol, *args, universal_newlines=True)
520        self.assertRaises(TypeError,
521            self.loop.run_until_complete, self.loop.subprocess_exec,
522            asyncio.SubprocessProtocol, *args, shell=True)
523        self.assertRaises(TypeError,
524            self.loop.run_until_complete, self.loop.subprocess_exec,
525            asyncio.SubprocessProtocol, *args, bufsize=4096)
526
527    def test_subprocess_shell_invalid_args(self):
528        # expected a string, not an int or a list
529        self.assertRaises(TypeError,
530            self.loop.run_until_complete, self.loop.subprocess_shell,
531            asyncio.SubprocessProtocol, 123)
532        self.assertRaises(TypeError,
533            self.loop.run_until_complete, self.loop.subprocess_shell,
534            asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass'])
535
536        # universal_newlines, shell, bufsize must not be set
537        self.assertRaises(TypeError,
538            self.loop.run_until_complete, self.loop.subprocess_shell,
539            asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True)
540        self.assertRaises(TypeError,
541            self.loop.run_until_complete, self.loop.subprocess_shell,
542            asyncio.SubprocessProtocol, 'exit 0', shell=True)
543        self.assertRaises(TypeError,
544            self.loop.run_until_complete, self.loop.subprocess_shell,
545            asyncio.SubprocessProtocol, 'exit 0', bufsize=4096)
546
547    def test_default_exc_handler_callback(self):
548        self.loop._process_events = mock.Mock()
549
550        def zero_error(fut):
551            fut.set_result(True)
552            1/0
553
554        # Test call_soon (events.Handle)
555        with mock.patch('asyncio.base_events.logger') as log:
556            fut = self.loop.create_future()
557            self.loop.call_soon(zero_error, fut)
558            fut.add_done_callback(lambda fut: self.loop.stop())
559            self.loop.run_forever()
560            log.error.assert_called_with(
561                test_utils.MockPattern('Exception in callback.*zero'),
562                exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
563
564        # Test call_later (events.TimerHandle)
565        with mock.patch('asyncio.base_events.logger') as log:
566            fut = self.loop.create_future()
567            self.loop.call_later(0.01, zero_error, fut)
568            fut.add_done_callback(lambda fut: self.loop.stop())
569            self.loop.run_forever()
570            log.error.assert_called_with(
571                test_utils.MockPattern('Exception in callback.*zero'),
572                exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
573
574    def test_default_exc_handler_coro(self):
575        self.loop._process_events = mock.Mock()
576
577        async def zero_error_coro():
578            await asyncio.sleep(0.01)
579            1/0
580
581        # Test Future.__del__
582        with mock.patch('asyncio.base_events.logger') as log:
583            fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop)
584            fut.add_done_callback(lambda *args: self.loop.stop())
585            self.loop.run_forever()
586            fut = None # Trigger Future.__del__ or futures._TracebackLogger
587            support.gc_collect()
588            if PY34:
589                # Future.__del__ in Python 3.4 logs error with
590                # an actual exception context
591                log.error.assert_called_with(
592                    test_utils.MockPattern('.*exception was never retrieved'),
593                    exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
594            else:
595                # futures._TracebackLogger logs only textual traceback
596                log.error.assert_called_with(
597                    test_utils.MockPattern(
598                        '.*exception was never retrieved.*ZeroDiv'),
599                    exc_info=False)
600
601    def test_set_exc_handler_invalid(self):
602        with self.assertRaisesRegex(TypeError, 'A callable object or None'):
603            self.loop.set_exception_handler('spam')
604
605    def test_set_exc_handler_custom(self):
606        def zero_error():
607            1/0
608
609        def run_loop():
610            handle = self.loop.call_soon(zero_error)
611            self.loop._run_once()
612            return handle
613
614        self.loop.set_debug(True)
615        self.loop._process_events = mock.Mock()
616
617        self.assertIsNone(self.loop.get_exception_handler())
618        mock_handler = mock.Mock()
619        self.loop.set_exception_handler(mock_handler)
620        self.assertIs(self.loop.get_exception_handler(), mock_handler)
621        handle = run_loop()
622        mock_handler.assert_called_with(self.loop, {
623            'exception': MOCK_ANY,
624            'message': test_utils.MockPattern(
625                                'Exception in callback.*zero_error'),
626            'handle': handle,
627            'source_traceback': handle._source_traceback,
628        })
629        mock_handler.reset_mock()
630
631        self.loop.set_exception_handler(None)
632        with mock.patch('asyncio.base_events.logger') as log:
633            run_loop()
634            log.error.assert_called_with(
635                        test_utils.MockPattern(
636                                'Exception in callback.*zero'),
637                        exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
638
639        assert not mock_handler.called
640
641    def test_set_exc_handler_broken(self):
642        def run_loop():
643            def zero_error():
644                1/0
645            self.loop.call_soon(zero_error)
646            self.loop._run_once()
647
648        def handler(loop, context):
649            raise AttributeError('spam')
650
651        self.loop._process_events = mock.Mock()
652
653        self.loop.set_exception_handler(handler)
654
655        with mock.patch('asyncio.base_events.logger') as log:
656            run_loop()
657            log.error.assert_called_with(
658                test_utils.MockPattern(
659                    'Unhandled error in exception handler'),
660                exc_info=(AttributeError, MOCK_ANY, MOCK_ANY))
661
662    def test_default_exc_handler_broken(self):
663        _context = None
664
665        class Loop(base_events.BaseEventLoop):
666
667            _selector = mock.Mock()
668            _process_events = mock.Mock()
669
670            def default_exception_handler(self, context):
671                nonlocal _context
672                _context = context
673                # Simulates custom buggy "default_exception_handler"
674                raise ValueError('spam')
675
676        loop = Loop()
677        self.addCleanup(loop.close)
678        asyncio.set_event_loop(loop)
679
680        def run_loop():
681            def zero_error():
682                1/0
683            loop.call_soon(zero_error)
684            loop._run_once()
685
686        with mock.patch('asyncio.base_events.logger') as log:
687            run_loop()
688            log.error.assert_called_with(
689                'Exception in default exception handler',
690                exc_info=True)
691
692        def custom_handler(loop, context):
693            raise ValueError('ham')
694
695        _context = None
696        loop.set_exception_handler(custom_handler)
697        with mock.patch('asyncio.base_events.logger') as log:
698            run_loop()
699            log.error.assert_called_with(
700                test_utils.MockPattern('Exception in default exception.*'
701                                       'while handling.*in custom'),
702                exc_info=True)
703
704            # Check that original context was passed to default
705            # exception handler.
706            self.assertIn('context', _context)
707            self.assertIs(type(_context['context']['exception']),
708                          ZeroDivisionError)
709
710    def test_set_task_factory_invalid(self):
711        with self.assertRaisesRegex(
712            TypeError, 'task factory must be a callable or None'):
713
714            self.loop.set_task_factory(1)
715
716        self.assertIsNone(self.loop.get_task_factory())
717
718    def test_set_task_factory(self):
719        self.loop._process_events = mock.Mock()
720
721        class MyTask(asyncio.Task):
722            pass
723
724        async def coro():
725            pass
726
727        factory = lambda loop, coro: MyTask(coro, loop=loop)
728
729        self.assertIsNone(self.loop.get_task_factory())
730        self.loop.set_task_factory(factory)
731        self.assertIs(self.loop.get_task_factory(), factory)
732
733        task = self.loop.create_task(coro())
734        self.assertTrue(isinstance(task, MyTask))
735        self.loop.run_until_complete(task)
736
737        self.loop.set_task_factory(None)
738        self.assertIsNone(self.loop.get_task_factory())
739
740        task = self.loop.create_task(coro())
741        self.assertTrue(isinstance(task, asyncio.Task))
742        self.assertFalse(isinstance(task, MyTask))
743        self.loop.run_until_complete(task)
744
745    def test_env_var_debug(self):
746        code = '\n'.join((
747            'import asyncio',
748            'loop = asyncio.get_event_loop()',
749            'print(loop.get_debug())'))
750
751        # Test with -E to not fail if the unit test was run with
752        # PYTHONASYNCIODEBUG set to a non-empty string
753        sts, stdout, stderr = assert_python_ok('-E', '-c', code)
754        self.assertEqual(stdout.rstrip(), b'False')
755
756        sts, stdout, stderr = assert_python_ok('-c', code,
757                                               PYTHONASYNCIODEBUG='',
758                                               PYTHONDEVMODE='')
759        self.assertEqual(stdout.rstrip(), b'False')
760
761        sts, stdout, stderr = assert_python_ok('-c', code,
762                                               PYTHONASYNCIODEBUG='1',
763                                               PYTHONDEVMODE='')
764        self.assertEqual(stdout.rstrip(), b'True')
765
766        sts, stdout, stderr = assert_python_ok('-E', '-c', code,
767                                               PYTHONASYNCIODEBUG='1')
768        self.assertEqual(stdout.rstrip(), b'False')
769
770        # -X dev
771        sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
772                                               '-c', code)
773        self.assertEqual(stdout.rstrip(), b'True')
774
775    def test_create_task(self):
776        class MyTask(asyncio.Task):
777            pass
778
779        async def test():
780            pass
781
782        class EventLoop(base_events.BaseEventLoop):
783            def create_task(self, coro):
784                return MyTask(coro, loop=loop)
785
786        loop = EventLoop()
787        self.set_event_loop(loop)
788
789        coro = test()
790        task = asyncio.ensure_future(coro, loop=loop)
791        self.assertIsInstance(task, MyTask)
792
793        # make warnings quiet
794        task._log_destroy_pending = False
795        coro.close()
796
797    def test_create_named_task_with_default_factory(self):
798        async def test():
799            pass
800
801        loop = asyncio.new_event_loop()
802        task = loop.create_task(test(), name='test_task')
803        try:
804            self.assertEqual(task.get_name(), 'test_task')
805        finally:
806            loop.run_until_complete(task)
807            loop.close()
808
809    def test_create_named_task_with_custom_factory(self):
810        def task_factory(loop, coro):
811            return asyncio.Task(coro, loop=loop)
812
813        async def test():
814            pass
815
816        loop = asyncio.new_event_loop()
817        loop.set_task_factory(task_factory)
818        task = loop.create_task(test(), name='test_task')
819        try:
820            self.assertEqual(task.get_name(), 'test_task')
821        finally:
822            loop.run_until_complete(task)
823            loop.close()
824
825    def test_run_forever_keyboard_interrupt(self):
826        # Python issue #22601: ensure that the temporary task created by
827        # run_forever() consumes the KeyboardInterrupt and so don't log
828        # a warning
829        async def raise_keyboard_interrupt():
830            raise KeyboardInterrupt
831
832        self.loop._process_events = mock.Mock()
833        self.loop.call_exception_handler = mock.Mock()
834
835        try:
836            self.loop.run_until_complete(raise_keyboard_interrupt())
837        except KeyboardInterrupt:
838            pass
839        self.loop.close()
840        support.gc_collect()
841
842        self.assertFalse(self.loop.call_exception_handler.called)
843
844    def test_run_until_complete_baseexception(self):
845        # Python issue #22429: run_until_complete() must not schedule a pending
846        # call to stop() if the future raised a BaseException
847        async def raise_keyboard_interrupt():
848            raise KeyboardInterrupt
849
850        self.loop._process_events = mock.Mock()
851
852        try:
853            self.loop.run_until_complete(raise_keyboard_interrupt())
854        except KeyboardInterrupt:
855            pass
856
857        def func():
858            self.loop.stop()
859            func.called = True
860        func.called = False
861        try:
862            self.loop.call_soon(func)
863            self.loop.run_forever()
864        except KeyboardInterrupt:
865            pass
866        self.assertTrue(func.called)
867
868    def test_single_selecter_event_callback_after_stopping(self):
869        # Python issue #25593: A stopped event loop may cause event callbacks
870        # to run more than once.
871        event_sentinel = object()
872        callcount = 0
873        doer = None
874
875        def proc_events(event_list):
876            nonlocal doer
877            if event_sentinel in event_list:
878                doer = self.loop.call_soon(do_event)
879
880        def do_event():
881            nonlocal callcount
882            callcount += 1
883            self.loop.call_soon(clear_selector)
884
885        def clear_selector():
886            doer.cancel()
887            self.loop._selector.select.return_value = ()
888
889        self.loop._process_events = proc_events
890        self.loop._selector.select.return_value = (event_sentinel,)
891
892        for i in range(1, 3):
893            with self.subTest('Loop %d/2' % i):
894                self.loop.call_soon(self.loop.stop)
895                self.loop.run_forever()
896                self.assertEqual(callcount, 1)
897
898    def test_run_once(self):
899        # Simple test for test_utils.run_once().  It may seem strange
900        # to have a test for this (the function isn't even used!) but
901        # it's a de-factor standard API for library tests.  This tests
902        # the idiom: loop.call_soon(loop.stop); loop.run_forever().
903        count = 0
904
905        def callback():
906            nonlocal count
907            count += 1
908
909        self.loop._process_events = mock.Mock()
910        self.loop.call_soon(callback)
911        test_utils.run_once(self.loop)
912        self.assertEqual(count, 1)
913
914    def test_run_forever_pre_stopped(self):
915        # Test that the old idiom for pre-stopping the loop works.
916        self.loop._process_events = mock.Mock()
917        self.loop.stop()
918        self.loop.run_forever()
919        self.loop._selector.select.assert_called_once_with(0)
920
921    async def leave_unfinalized_asyncgen(self):
922        # Create an async generator, iterate it partially, and leave it
923        # to be garbage collected.
924        # Used in async generator finalization tests.
925        # Depends on implementation details of garbage collector. Changes
926        # in gc may break this function.
927        status = {'started': False,
928                  'stopped': False,
929                  'finalized': False}
930
931        async def agen():
932            status['started'] = True
933            try:
934                for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']:
935                    yield item
936            finally:
937                status['finalized'] = True
938
939        ag = agen()
940        ai = ag.__aiter__()
941
942        async def iter_one():
943            try:
944                item = await ai.__anext__()
945            except StopAsyncIteration:
946                return
947            if item == 'THREE':
948                status['stopped'] = True
949                return
950            asyncio.create_task(iter_one())
951
952        asyncio.create_task(iter_one())
953        return status
954
955    def test_asyncgen_finalization_by_gc(self):
956        # Async generators should be finalized when garbage collected.
957        self.loop._process_events = mock.Mock()
958        self.loop._write_to_self = mock.Mock()
959        with support.disable_gc():
960            status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
961            while not status['stopped']:
962                test_utils.run_briefly(self.loop)
963            self.assertTrue(status['started'])
964            self.assertTrue(status['stopped'])
965            self.assertFalse(status['finalized'])
966            support.gc_collect()
967            test_utils.run_briefly(self.loop)
968            self.assertTrue(status['finalized'])
969
970    def test_asyncgen_finalization_by_gc_in_other_thread(self):
971        # Python issue 34769: If garbage collector runs in another
972        # thread, async generators will not finalize in debug
973        # mode.
974        self.loop._process_events = mock.Mock()
975        self.loop._write_to_self = mock.Mock()
976        self.loop.set_debug(True)
977        with support.disable_gc():
978            status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
979            while not status['stopped']:
980                test_utils.run_briefly(self.loop)
981            self.assertTrue(status['started'])
982            self.assertTrue(status['stopped'])
983            self.assertFalse(status['finalized'])
984            self.loop.run_until_complete(
985                self.loop.run_in_executor(None, support.gc_collect))
986            test_utils.run_briefly(self.loop)
987            self.assertTrue(status['finalized'])
988
989
990class MyProto(asyncio.Protocol):
991    done = None
992
993    def __init__(self, create_future=False):
994        self.state = 'INITIAL'
995        self.nbytes = 0
996        if create_future:
997            self.done = asyncio.get_running_loop().create_future()
998
999    def connection_made(self, transport):
1000        self.transport = transport
1001        assert self.state == 'INITIAL', self.state
1002        self.state = 'CONNECTED'
1003        transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
1004
1005    def data_received(self, data):
1006        assert self.state == 'CONNECTED', self.state
1007        self.nbytes += len(data)
1008
1009    def eof_received(self):
1010        assert self.state == 'CONNECTED', self.state
1011        self.state = 'EOF'
1012
1013    def connection_lost(self, exc):
1014        assert self.state in ('CONNECTED', 'EOF'), self.state
1015        self.state = 'CLOSED'
1016        if self.done:
1017            self.done.set_result(None)
1018
1019
1020class MyDatagramProto(asyncio.DatagramProtocol):
1021    done = None
1022
1023    def __init__(self, create_future=False, loop=None):
1024        self.state = 'INITIAL'
1025        self.nbytes = 0
1026        if create_future:
1027            self.done = loop.create_future()
1028
1029    def connection_made(self, transport):
1030        self.transport = transport
1031        assert self.state == 'INITIAL', self.state
1032        self.state = 'INITIALIZED'
1033
1034    def datagram_received(self, data, addr):
1035        assert self.state == 'INITIALIZED', self.state
1036        self.nbytes += len(data)
1037
1038    def error_received(self, exc):
1039        assert self.state == 'INITIALIZED', self.state
1040
1041    def connection_lost(self, exc):
1042        assert self.state == 'INITIALIZED', self.state
1043        self.state = 'CLOSED'
1044        if self.done:
1045            self.done.set_result(None)
1046
1047
1048class BaseEventLoopWithSelectorTests(test_utils.TestCase):
1049
1050    def setUp(self):
1051        super().setUp()
1052        self.loop = asyncio.SelectorEventLoop()
1053        self.set_event_loop(self.loop)
1054
1055    @mock.patch('socket.getnameinfo')
1056    def test_getnameinfo(self, m_gai):
1057        m_gai.side_effect = lambda *args: 42
1058        r = self.loop.run_until_complete(self.loop.getnameinfo(('abc', 123)))
1059        self.assertEqual(r, 42)
1060
1061    @patch_socket
1062    def test_create_connection_multiple_errors(self, m_socket):
1063
1064        class MyProto(asyncio.Protocol):
1065            pass
1066
1067        async def getaddrinfo(*args, **kw):
1068            return [(2, 1, 6, '', ('107.6.106.82', 80)),
1069                    (2, 1, 6, '', ('107.6.106.82', 80))]
1070
1071        def getaddrinfo_task(*args, **kwds):
1072            return self.loop.create_task(getaddrinfo(*args, **kwds))
1073
1074        idx = -1
1075        errors = ['err1', 'err2']
1076
1077        def _socket(*args, **kw):
1078            nonlocal idx, errors
1079            idx += 1
1080            raise OSError(errors[idx])
1081
1082        m_socket.socket = _socket
1083
1084        self.loop.getaddrinfo = getaddrinfo_task
1085
1086        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1087        with self.assertRaises(OSError) as cm:
1088            self.loop.run_until_complete(coro)
1089
1090        self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2')
1091
1092    @patch_socket
1093    def test_create_connection_timeout(self, m_socket):
1094        # Ensure that the socket is closed on timeout
1095        sock = mock.Mock()
1096        m_socket.socket.return_value = sock
1097
1098        def getaddrinfo(*args, **kw):
1099            fut = self.loop.create_future()
1100            addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '',
1101                    ('127.0.0.1', 80))
1102            fut.set_result([addr])
1103            return fut
1104        self.loop.getaddrinfo = getaddrinfo
1105
1106        with mock.patch.object(self.loop, 'sock_connect',
1107                               side_effect=asyncio.TimeoutError):
1108            coro = self.loop.create_connection(MyProto, '127.0.0.1', 80)
1109            with self.assertRaises(asyncio.TimeoutError):
1110                self.loop.run_until_complete(coro)
1111            self.assertTrue(sock.close.called)
1112
1113    def test_create_connection_host_port_sock(self):
1114        coro = self.loop.create_connection(
1115            MyProto, 'example.com', 80, sock=object())
1116        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1117
1118    def test_create_connection_wrong_sock(self):
1119        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1120        with sock:
1121            coro = self.loop.create_connection(MyProto, sock=sock)
1122            with self.assertRaisesRegex(ValueError,
1123                                        'A Stream Socket was expected'):
1124                self.loop.run_until_complete(coro)
1125
1126    def test_create_server_wrong_sock(self):
1127        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1128        with sock:
1129            coro = self.loop.create_server(MyProto, sock=sock)
1130            with self.assertRaisesRegex(ValueError,
1131                                        'A Stream Socket was expected'):
1132                self.loop.run_until_complete(coro)
1133
1134    def test_create_server_ssl_timeout_for_plain_socket(self):
1135        coro = self.loop.create_server(
1136            MyProto, 'example.com', 80, ssl_handshake_timeout=1)
1137        with self.assertRaisesRegex(
1138                ValueError,
1139                'ssl_handshake_timeout is only meaningful with ssl'):
1140            self.loop.run_until_complete(coro)
1141
1142    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
1143                         'no socket.SOCK_NONBLOCK (linux only)')
1144    def test_create_server_stream_bittype(self):
1145        sock = socket.socket(
1146            socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
1147        with sock:
1148            coro = self.loop.create_server(lambda: None, sock=sock)
1149            srv = self.loop.run_until_complete(coro)
1150            srv.close()
1151            self.loop.run_until_complete(srv.wait_closed())
1152
1153    @unittest.skipUnless(support.IPV6_ENABLED, 'no IPv6 support')
1154    def test_create_server_ipv6(self):
1155        async def main():
1156            with self.assertWarns(DeprecationWarning):
1157                srv = await asyncio.start_server(
1158                    lambda: None, '::1', 0, loop=self.loop)
1159            try:
1160                self.assertGreater(len(srv.sockets), 0)
1161            finally:
1162                srv.close()
1163                await srv.wait_closed()
1164
1165        try:
1166            self.loop.run_until_complete(main())
1167        except OSError as ex:
1168            if (hasattr(errno, 'EADDRNOTAVAIL') and
1169                    ex.errno == errno.EADDRNOTAVAIL):
1170                self.skipTest('failed to bind to ::1')
1171            else:
1172                raise
1173
1174    def test_create_datagram_endpoint_wrong_sock(self):
1175        sock = socket.socket(socket.AF_INET)
1176        with sock:
1177            coro = self.loop.create_datagram_endpoint(MyProto, sock=sock)
1178            with self.assertRaisesRegex(ValueError,
1179                                        'A UDP Socket was expected'):
1180                self.loop.run_until_complete(coro)
1181
1182    def test_create_connection_no_host_port_sock(self):
1183        coro = self.loop.create_connection(MyProto)
1184        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1185
1186    def test_create_connection_no_getaddrinfo(self):
1187        async def getaddrinfo(*args, **kw):
1188            return []
1189
1190        def getaddrinfo_task(*args, **kwds):
1191            return self.loop.create_task(getaddrinfo(*args, **kwds))
1192
1193        self.loop.getaddrinfo = getaddrinfo_task
1194        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1195        self.assertRaises(
1196            OSError, self.loop.run_until_complete, coro)
1197
1198    def test_create_connection_connect_err(self):
1199        async def getaddrinfo(*args, **kw):
1200            return [(2, 1, 6, '', ('107.6.106.82', 80))]
1201
1202        def getaddrinfo_task(*args, **kwds):
1203            return self.loop.create_task(getaddrinfo(*args, **kwds))
1204
1205        self.loop.getaddrinfo = getaddrinfo_task
1206        self.loop.sock_connect = mock.Mock()
1207        self.loop.sock_connect.side_effect = OSError
1208
1209        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1210        self.assertRaises(
1211            OSError, self.loop.run_until_complete, coro)
1212
1213    def test_create_connection_multiple(self):
1214        async def getaddrinfo(*args, **kw):
1215            return [(2, 1, 6, '', ('0.0.0.1', 80)),
1216                    (2, 1, 6, '', ('0.0.0.2', 80))]
1217
1218        def getaddrinfo_task(*args, **kwds):
1219            return self.loop.create_task(getaddrinfo(*args, **kwds))
1220
1221        self.loop.getaddrinfo = getaddrinfo_task
1222        self.loop.sock_connect = mock.Mock()
1223        self.loop.sock_connect.side_effect = OSError
1224
1225        coro = self.loop.create_connection(
1226            MyProto, 'example.com', 80, family=socket.AF_INET)
1227        with self.assertRaises(OSError):
1228            self.loop.run_until_complete(coro)
1229
1230    @patch_socket
1231    def test_create_connection_multiple_errors_local_addr(self, m_socket):
1232
1233        def bind(addr):
1234            if addr[0] == '0.0.0.1':
1235                err = OSError('Err')
1236                err.strerror = 'Err'
1237                raise err
1238
1239        m_socket.socket.return_value.bind = bind
1240
1241        async def getaddrinfo(*args, **kw):
1242            return [(2, 1, 6, '', ('0.0.0.1', 80)),
1243                    (2, 1, 6, '', ('0.0.0.2', 80))]
1244
1245        def getaddrinfo_task(*args, **kwds):
1246            return self.loop.create_task(getaddrinfo(*args, **kwds))
1247
1248        self.loop.getaddrinfo = getaddrinfo_task
1249        self.loop.sock_connect = mock.Mock()
1250        self.loop.sock_connect.side_effect = OSError('Err2')
1251
1252        coro = self.loop.create_connection(
1253            MyProto, 'example.com', 80, family=socket.AF_INET,
1254            local_addr=(None, 8080))
1255        with self.assertRaises(OSError) as cm:
1256            self.loop.run_until_complete(coro)
1257
1258        self.assertTrue(str(cm.exception).startswith('Multiple exceptions: '))
1259        self.assertTrue(m_socket.socket.return_value.close.called)
1260
1261    def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton):
1262        # Test the fallback code, even if this system has inet_pton.
1263        if not allow_inet_pton:
1264            del m_socket.inet_pton
1265
1266        m_socket.getaddrinfo = socket.getaddrinfo
1267        sock = m_socket.socket.return_value
1268
1269        self.loop._add_reader = mock.Mock()
1270        self.loop._add_reader._is_coroutine = False
1271        self.loop._add_writer = mock.Mock()
1272        self.loop._add_writer._is_coroutine = False
1273
1274        coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80)
1275        t, p = self.loop.run_until_complete(coro)
1276        try:
1277            sock.connect.assert_called_with(('1.2.3.4', 80))
1278            _, kwargs = m_socket.socket.call_args
1279            self.assertEqual(kwargs['family'], m_socket.AF_INET)
1280            self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1281        finally:
1282            t.close()
1283            test_utils.run_briefly(self.loop)  # allow transport to close
1284
1285        if support.IPV6_ENABLED:
1286            sock.family = socket.AF_INET6
1287            coro = self.loop.create_connection(asyncio.Protocol, '::1', 80)
1288            t, p = self.loop.run_until_complete(coro)
1289            try:
1290                # Without inet_pton we use getaddrinfo, which transforms
1291                # ('::1', 80) to ('::1', 80, 0, 0). The last 0s are flow info,
1292                # scope id.
1293                [address] = sock.connect.call_args[0]
1294                host, port = address[:2]
1295                self.assertRegex(host, r'::(0\.)*1')
1296                self.assertEqual(port, 80)
1297                _, kwargs = m_socket.socket.call_args
1298                self.assertEqual(kwargs['family'], m_socket.AF_INET6)
1299                self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1300            finally:
1301                t.close()
1302                test_utils.run_briefly(self.loop)  # allow transport to close
1303
1304    @unittest.skipUnless(support.IPV6_ENABLED, 'no IPv6 support')
1305    @unittest.skipIf(sys.platform.startswith('aix'),
1306                    "bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX")
1307    @patch_socket
1308    def test_create_connection_ipv6_scope(self, m_socket):
1309        m_socket.getaddrinfo = socket.getaddrinfo
1310        sock = m_socket.socket.return_value
1311        sock.family = socket.AF_INET6
1312
1313        self.loop._add_reader = mock.Mock()
1314        self.loop._add_reader._is_coroutine = False
1315        self.loop._add_writer = mock.Mock()
1316        self.loop._add_writer._is_coroutine = False
1317
1318        coro = self.loop.create_connection(asyncio.Protocol, 'fe80::1%1', 80)
1319        t, p = self.loop.run_until_complete(coro)
1320        try:
1321            sock.connect.assert_called_with(('fe80::1', 80, 0, 1))
1322            _, kwargs = m_socket.socket.call_args
1323            self.assertEqual(kwargs['family'], m_socket.AF_INET6)
1324            self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1325        finally:
1326            t.close()
1327            test_utils.run_briefly(self.loop)  # allow transport to close
1328
1329    @patch_socket
1330    def test_create_connection_ip_addr(self, m_socket):
1331        self._test_create_connection_ip_addr(m_socket, True)
1332
1333    @patch_socket
1334    def test_create_connection_no_inet_pton(self, m_socket):
1335        self._test_create_connection_ip_addr(m_socket, False)
1336
1337    @patch_socket
1338    def test_create_connection_service_name(self, m_socket):
1339        m_socket.getaddrinfo = socket.getaddrinfo
1340        sock = m_socket.socket.return_value
1341
1342        self.loop._add_reader = mock.Mock()
1343        self.loop._add_reader._is_coroutine = False
1344        self.loop._add_writer = mock.Mock()
1345        self.loop._add_writer._is_coroutine = False
1346
1347        for service, port in ('http', 80), (b'http', 80):
1348            coro = self.loop.create_connection(asyncio.Protocol,
1349                                               '127.0.0.1', service)
1350
1351            t, p = self.loop.run_until_complete(coro)
1352            try:
1353                sock.connect.assert_called_with(('127.0.0.1', port))
1354                _, kwargs = m_socket.socket.call_args
1355                self.assertEqual(kwargs['family'], m_socket.AF_INET)
1356                self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1357            finally:
1358                t.close()
1359                test_utils.run_briefly(self.loop)  # allow transport to close
1360
1361        for service in 'nonsense', b'nonsense':
1362            coro = self.loop.create_connection(asyncio.Protocol,
1363                                               '127.0.0.1', service)
1364
1365            with self.assertRaises(OSError):
1366                self.loop.run_until_complete(coro)
1367
1368    def test_create_connection_no_local_addr(self):
1369        async def getaddrinfo(host, *args, **kw):
1370            if host == 'example.com':
1371                return [(2, 1, 6, '', ('107.6.106.82', 80)),
1372                        (2, 1, 6, '', ('107.6.106.82', 80))]
1373            else:
1374                return []
1375
1376        def getaddrinfo_task(*args, **kwds):
1377            return self.loop.create_task(getaddrinfo(*args, **kwds))
1378        self.loop.getaddrinfo = getaddrinfo_task
1379
1380        coro = self.loop.create_connection(
1381            MyProto, 'example.com', 80, family=socket.AF_INET,
1382            local_addr=(None, 8080))
1383        self.assertRaises(
1384            OSError, self.loop.run_until_complete, coro)
1385
1386    @patch_socket
1387    def test_create_connection_bluetooth(self, m_socket):
1388        # See http://bugs.python.org/issue27136, fallback to getaddrinfo when
1389        # we can't recognize an address is resolved, e.g. a Bluetooth address.
1390        addr = ('00:01:02:03:04:05', 1)
1391
1392        def getaddrinfo(host, port, *args, **kw):
1393            assert (host, port) == addr
1394            return [(999, 1, 999, '', (addr, 1))]
1395
1396        m_socket.getaddrinfo = getaddrinfo
1397        sock = m_socket.socket()
1398        coro = self.loop.sock_connect(sock, addr)
1399        self.loop.run_until_complete(coro)
1400
1401    def test_create_connection_ssl_server_hostname_default(self):
1402        self.loop.getaddrinfo = mock.Mock()
1403
1404        def mock_getaddrinfo(*args, **kwds):
1405            f = self.loop.create_future()
1406            f.set_result([(socket.AF_INET, socket.SOCK_STREAM,
1407                           socket.SOL_TCP, '', ('1.2.3.4', 80))])
1408            return f
1409
1410        self.loop.getaddrinfo.side_effect = mock_getaddrinfo
1411        self.loop.sock_connect = mock.Mock()
1412        self.loop.sock_connect.return_value = self.loop.create_future()
1413        self.loop.sock_connect.return_value.set_result(None)
1414        self.loop._make_ssl_transport = mock.Mock()
1415
1416        class _SelectorTransportMock:
1417            _sock = None
1418
1419            def get_extra_info(self, key):
1420                return mock.Mock()
1421
1422            def close(self):
1423                self._sock.close()
1424
1425        def mock_make_ssl_transport(sock, protocol, sslcontext, waiter,
1426                                    **kwds):
1427            waiter.set_result(None)
1428            transport = _SelectorTransportMock()
1429            transport._sock = sock
1430            return transport
1431
1432        self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport
1433        ANY = mock.ANY
1434        handshake_timeout = object()
1435        # First try the default server_hostname.
1436        self.loop._make_ssl_transport.reset_mock()
1437        coro = self.loop.create_connection(
1438                MyProto, 'python.org', 80, ssl=True,
1439                ssl_handshake_timeout=handshake_timeout)
1440        transport, _ = self.loop.run_until_complete(coro)
1441        transport.close()
1442        self.loop._make_ssl_transport.assert_called_with(
1443            ANY, ANY, ANY, ANY,
1444            server_side=False,
1445            server_hostname='python.org',
1446            ssl_handshake_timeout=handshake_timeout)
1447        # Next try an explicit server_hostname.
1448        self.loop._make_ssl_transport.reset_mock()
1449        coro = self.loop.create_connection(
1450                MyProto, 'python.org', 80, ssl=True,
1451                server_hostname='perl.com',
1452                ssl_handshake_timeout=handshake_timeout)
1453        transport, _ = self.loop.run_until_complete(coro)
1454        transport.close()
1455        self.loop._make_ssl_transport.assert_called_with(
1456            ANY, ANY, ANY, ANY,
1457            server_side=False,
1458            server_hostname='perl.com',
1459            ssl_handshake_timeout=handshake_timeout)
1460        # Finally try an explicit empty server_hostname.
1461        self.loop._make_ssl_transport.reset_mock()
1462        coro = self.loop.create_connection(
1463                MyProto, 'python.org', 80, ssl=True,
1464                server_hostname='',
1465                ssl_handshake_timeout=handshake_timeout)
1466        transport, _ = self.loop.run_until_complete(coro)
1467        transport.close()
1468        self.loop._make_ssl_transport.assert_called_with(
1469                ANY, ANY, ANY, ANY,
1470                server_side=False,
1471                server_hostname='',
1472                ssl_handshake_timeout=handshake_timeout)
1473
1474    def test_create_connection_no_ssl_server_hostname_errors(self):
1475        # When not using ssl, server_hostname must be None.
1476        coro = self.loop.create_connection(MyProto, 'python.org', 80,
1477                                           server_hostname='')
1478        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1479        coro = self.loop.create_connection(MyProto, 'python.org', 80,
1480                                           server_hostname='python.org')
1481        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1482
1483    def test_create_connection_ssl_server_hostname_errors(self):
1484        # When using ssl, server_hostname may be None if host is non-empty.
1485        coro = self.loop.create_connection(MyProto, '', 80, ssl=True)
1486        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1487        coro = self.loop.create_connection(MyProto, None, 80, ssl=True)
1488        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1489        sock = socket.socket()
1490        coro = self.loop.create_connection(MyProto, None, None,
1491                                           ssl=True, sock=sock)
1492        self.addCleanup(sock.close)
1493        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1494
1495    def test_create_connection_ssl_timeout_for_plain_socket(self):
1496        coro = self.loop.create_connection(
1497            MyProto, 'example.com', 80, ssl_handshake_timeout=1)
1498        with self.assertRaisesRegex(
1499                ValueError,
1500                'ssl_handshake_timeout is only meaningful with ssl'):
1501            self.loop.run_until_complete(coro)
1502
1503    def test_create_server_empty_host(self):
1504        # if host is empty string use None instead
1505        host = object()
1506
1507        async def getaddrinfo(*args, **kw):
1508            nonlocal host
1509            host = args[0]
1510            return []
1511
1512        def getaddrinfo_task(*args, **kwds):
1513            return self.loop.create_task(getaddrinfo(*args, **kwds))
1514
1515        self.loop.getaddrinfo = getaddrinfo_task
1516        fut = self.loop.create_server(MyProto, '', 0)
1517        self.assertRaises(OSError, self.loop.run_until_complete, fut)
1518        self.assertIsNone(host)
1519
1520    def test_create_server_host_port_sock(self):
1521        fut = self.loop.create_server(
1522            MyProto, '0.0.0.0', 0, sock=object())
1523        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1524
1525    def test_create_server_no_host_port_sock(self):
1526        fut = self.loop.create_server(MyProto)
1527        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1528
1529    def test_create_server_no_getaddrinfo(self):
1530        getaddrinfo = self.loop.getaddrinfo = mock.Mock()
1531        getaddrinfo.return_value = self.loop.create_future()
1532        getaddrinfo.return_value.set_result(None)
1533
1534        f = self.loop.create_server(MyProto, 'python.org', 0)
1535        self.assertRaises(OSError, self.loop.run_until_complete, f)
1536
1537    @patch_socket
1538    def test_create_server_nosoreuseport(self, m_socket):
1539        m_socket.getaddrinfo = socket.getaddrinfo
1540        del m_socket.SO_REUSEPORT
1541        m_socket.socket.return_value = mock.Mock()
1542
1543        f = self.loop.create_server(
1544            MyProto, '0.0.0.0', 0, reuse_port=True)
1545
1546        self.assertRaises(ValueError, self.loop.run_until_complete, f)
1547
1548    @patch_socket
1549    def test_create_server_soreuseport_only_defined(self, m_socket):
1550        m_socket.getaddrinfo = socket.getaddrinfo
1551        m_socket.socket.return_value = mock.Mock()
1552        m_socket.SO_REUSEPORT = -1
1553
1554        f = self.loop.create_server(
1555            MyProto, '0.0.0.0', 0, reuse_port=True)
1556
1557        self.assertRaises(ValueError, self.loop.run_until_complete, f)
1558
1559    @patch_socket
1560    def test_create_server_cant_bind(self, m_socket):
1561
1562        class Err(OSError):
1563            strerror = 'error'
1564
1565        m_socket.getaddrinfo.return_value = [
1566            (2, 1, 6, '', ('127.0.0.1', 10100))]
1567        m_socket.getaddrinfo._is_coroutine = False
1568        m_sock = m_socket.socket.return_value = mock.Mock()
1569        m_sock.bind.side_effect = Err
1570
1571        fut = self.loop.create_server(MyProto, '0.0.0.0', 0)
1572        self.assertRaises(OSError, self.loop.run_until_complete, fut)
1573        self.assertTrue(m_sock.close.called)
1574
1575    @patch_socket
1576    def test_create_datagram_endpoint_no_addrinfo(self, m_socket):
1577        m_socket.getaddrinfo.return_value = []
1578        m_socket.getaddrinfo._is_coroutine = False
1579
1580        coro = self.loop.create_datagram_endpoint(
1581            MyDatagramProto, local_addr=('localhost', 0))
1582        self.assertRaises(
1583            OSError, self.loop.run_until_complete, coro)
1584
1585    def test_create_datagram_endpoint_addr_error(self):
1586        coro = self.loop.create_datagram_endpoint(
1587            MyDatagramProto, local_addr='localhost')
1588        self.assertRaises(
1589            AssertionError, self.loop.run_until_complete, coro)
1590        coro = self.loop.create_datagram_endpoint(
1591            MyDatagramProto, local_addr=('localhost', 1, 2, 3))
1592        self.assertRaises(
1593            AssertionError, self.loop.run_until_complete, coro)
1594
1595    def test_create_datagram_endpoint_connect_err(self):
1596        self.loop.sock_connect = mock.Mock()
1597        self.loop.sock_connect.side_effect = OSError
1598
1599        coro = self.loop.create_datagram_endpoint(
1600            asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0))
1601        self.assertRaises(
1602            OSError, self.loop.run_until_complete, coro)
1603
1604    def test_create_datagram_endpoint_allow_broadcast(self):
1605        protocol = MyDatagramProto(create_future=True, loop=self.loop)
1606        self.loop.sock_connect = sock_connect = mock.Mock()
1607        sock_connect.return_value = []
1608
1609        coro = self.loop.create_datagram_endpoint(
1610            lambda: protocol,
1611            remote_addr=('127.0.0.1', 0),
1612            allow_broadcast=True)
1613
1614        transport, _ = self.loop.run_until_complete(coro)
1615        self.assertFalse(sock_connect.called)
1616
1617        transport.close()
1618        self.loop.run_until_complete(protocol.done)
1619        self.assertEqual('CLOSED', protocol.state)
1620
1621    @patch_socket
1622    def test_create_datagram_endpoint_socket_err(self, m_socket):
1623        m_socket.getaddrinfo = socket.getaddrinfo
1624        m_socket.socket.side_effect = OSError
1625
1626        coro = self.loop.create_datagram_endpoint(
1627            asyncio.DatagramProtocol, family=socket.AF_INET)
1628        self.assertRaises(
1629            OSError, self.loop.run_until_complete, coro)
1630
1631        coro = self.loop.create_datagram_endpoint(
1632            asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0))
1633        self.assertRaises(
1634            OSError, self.loop.run_until_complete, coro)
1635
1636    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
1637    def test_create_datagram_endpoint_no_matching_family(self):
1638        coro = self.loop.create_datagram_endpoint(
1639            asyncio.DatagramProtocol,
1640            remote_addr=('127.0.0.1', 0), local_addr=('::1', 0))
1641        self.assertRaises(
1642            ValueError, self.loop.run_until_complete, coro)
1643
1644    @patch_socket
1645    def test_create_datagram_endpoint_setblk_err(self, m_socket):
1646        m_socket.socket.return_value.setblocking.side_effect = OSError
1647
1648        coro = self.loop.create_datagram_endpoint(
1649            asyncio.DatagramProtocol, family=socket.AF_INET)
1650        self.assertRaises(
1651            OSError, self.loop.run_until_complete, coro)
1652        self.assertTrue(
1653            m_socket.socket.return_value.close.called)
1654
1655    def test_create_datagram_endpoint_noaddr_nofamily(self):
1656        coro = self.loop.create_datagram_endpoint(
1657            asyncio.DatagramProtocol)
1658        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1659
1660    @patch_socket
1661    def test_create_datagram_endpoint_cant_bind(self, m_socket):
1662        class Err(OSError):
1663            pass
1664
1665        m_socket.getaddrinfo = socket.getaddrinfo
1666        m_sock = m_socket.socket.return_value = mock.Mock()
1667        m_sock.bind.side_effect = Err
1668
1669        fut = self.loop.create_datagram_endpoint(
1670            MyDatagramProto,
1671            local_addr=('127.0.0.1', 0), family=socket.AF_INET)
1672        self.assertRaises(Err, self.loop.run_until_complete, fut)
1673        self.assertTrue(m_sock.close.called)
1674
1675    def test_create_datagram_endpoint_sock(self):
1676        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1677        sock.bind(('127.0.0.1', 0))
1678        fut = self.loop.create_datagram_endpoint(
1679            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1680            sock=sock)
1681        transport, protocol = self.loop.run_until_complete(fut)
1682        transport.close()
1683        self.loop.run_until_complete(protocol.done)
1684        self.assertEqual('CLOSED', protocol.state)
1685
1686    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
1687    def test_create_datagram_endpoint_sock_unix(self):
1688        fut = self.loop.create_datagram_endpoint(
1689            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1690            family=socket.AF_UNIX)
1691        transport, protocol = self.loop.run_until_complete(fut)
1692        assert transport._sock.family == socket.AF_UNIX
1693        transport.close()
1694        self.loop.run_until_complete(protocol.done)
1695        self.assertEqual('CLOSED', protocol.state)
1696
1697    @support.skip_unless_bind_unix_socket
1698    def test_create_datagram_endpoint_existing_sock_unix(self):
1699        with test_utils.unix_socket_path() as path:
1700            sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM)
1701            sock.bind(path)
1702            sock.close()
1703
1704            coro = self.loop.create_datagram_endpoint(
1705                lambda: MyDatagramProto(create_future=True, loop=self.loop),
1706                path, family=socket.AF_UNIX)
1707            transport, protocol = self.loop.run_until_complete(coro)
1708            transport.close()
1709            self.loop.run_until_complete(protocol.done)
1710
1711    def test_create_datagram_endpoint_sock_sockopts(self):
1712        class FakeSock:
1713            type = socket.SOCK_DGRAM
1714
1715        fut = self.loop.create_datagram_endpoint(
1716            MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock())
1717        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1718
1719        fut = self.loop.create_datagram_endpoint(
1720            MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock())
1721        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1722
1723        fut = self.loop.create_datagram_endpoint(
1724            MyDatagramProto, family=1, sock=FakeSock())
1725        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1726
1727        fut = self.loop.create_datagram_endpoint(
1728            MyDatagramProto, proto=1, sock=FakeSock())
1729        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1730
1731        fut = self.loop.create_datagram_endpoint(
1732            MyDatagramProto, flags=1, sock=FakeSock())
1733        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1734
1735        fut = self.loop.create_datagram_endpoint(
1736            MyDatagramProto, reuse_port=True, sock=FakeSock())
1737        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1738
1739        fut = self.loop.create_datagram_endpoint(
1740            MyDatagramProto, allow_broadcast=True, sock=FakeSock())
1741        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1742
1743    def test_create_datagram_endpoint_sockopts(self):
1744        # Socket options should not be applied unless asked for.
1745        # SO_REUSEPORT is not available on all platforms.
1746
1747        coro = self.loop.create_datagram_endpoint(
1748            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1749            local_addr=('127.0.0.1', 0))
1750        transport, protocol = self.loop.run_until_complete(coro)
1751        sock = transport.get_extra_info('socket')
1752
1753        reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
1754
1755        if reuseport_supported:
1756            self.assertFalse(
1757                sock.getsockopt(
1758                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
1759        self.assertFalse(
1760            sock.getsockopt(
1761                socket.SOL_SOCKET, socket.SO_BROADCAST))
1762
1763        transport.close()
1764        self.loop.run_until_complete(protocol.done)
1765        self.assertEqual('CLOSED', protocol.state)
1766
1767        coro = self.loop.create_datagram_endpoint(
1768            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1769            local_addr=('127.0.0.1', 0),
1770            reuse_port=reuseport_supported,
1771            allow_broadcast=True)
1772        transport, protocol = self.loop.run_until_complete(coro)
1773        sock = transport.get_extra_info('socket')
1774
1775        self.assertFalse(
1776            sock.getsockopt(
1777                socket.SOL_SOCKET, socket.SO_REUSEADDR))
1778        if reuseport_supported:
1779            self.assertTrue(
1780                sock.getsockopt(
1781                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
1782        self.assertTrue(
1783            sock.getsockopt(
1784                socket.SOL_SOCKET, socket.SO_BROADCAST))
1785
1786        transport.close()
1787        self.loop.run_until_complete(protocol.done)
1788        self.assertEqual('CLOSED', protocol.state)
1789
1790    def test_create_datagram_endpoint_reuse_address_error(self):
1791        # bpo-37228: Ensure that explicit passing of `reuse_address=True`
1792        # raises an error, as it is not safe to use SO_REUSEADDR when using UDP
1793
1794        coro = self.loop.create_datagram_endpoint(
1795            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1796            local_addr=('127.0.0.1', 0),
1797            reuse_address=True)
1798
1799        with self.assertRaises(ValueError):
1800            self.loop.run_until_complete(coro)
1801
1802    def test_create_datagram_endpoint_reuse_address_warning(self):
1803        # bpo-37228: Deprecate *reuse_address* parameter
1804
1805        coro = self.loop.create_datagram_endpoint(
1806            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1807            local_addr=('127.0.0.1', 0),
1808            reuse_address=False)
1809
1810        with self.assertWarns(DeprecationWarning):
1811            transport, protocol = self.loop.run_until_complete(coro)
1812            transport.close()
1813            self.loop.run_until_complete(protocol.done)
1814            self.assertEqual('CLOSED', protocol.state)
1815
1816    @patch_socket
1817    def test_create_datagram_endpoint_nosoreuseport(self, m_socket):
1818        del m_socket.SO_REUSEPORT
1819        m_socket.socket.return_value = mock.Mock()
1820
1821        coro = self.loop.create_datagram_endpoint(
1822            lambda: MyDatagramProto(loop=self.loop),
1823            local_addr=('127.0.0.1', 0),
1824            reuse_port=True)
1825
1826        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1827
1828    @patch_socket
1829    def test_create_datagram_endpoint_ip_addr(self, m_socket):
1830        def getaddrinfo(*args, **kw):
1831            self.fail('should not have called getaddrinfo')
1832
1833        m_socket.getaddrinfo = getaddrinfo
1834        m_socket.socket.return_value.bind = bind = mock.Mock()
1835        self.loop._add_reader = mock.Mock()
1836        self.loop._add_reader._is_coroutine = False
1837
1838        reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
1839        coro = self.loop.create_datagram_endpoint(
1840            lambda: MyDatagramProto(loop=self.loop),
1841            local_addr=('1.2.3.4', 0),
1842            reuse_port=reuseport_supported)
1843
1844        t, p = self.loop.run_until_complete(coro)
1845        try:
1846            bind.assert_called_with(('1.2.3.4', 0))
1847            m_socket.socket.assert_called_with(family=m_socket.AF_INET,
1848                                               proto=m_socket.IPPROTO_UDP,
1849                                               type=m_socket.SOCK_DGRAM)
1850        finally:
1851            t.close()
1852            test_utils.run_briefly(self.loop)  # allow transport to close
1853
1854    def test_accept_connection_retry(self):
1855        sock = mock.Mock()
1856        sock.accept.side_effect = BlockingIOError()
1857
1858        self.loop._accept_connection(MyProto, sock)
1859        self.assertFalse(sock.close.called)
1860
1861    @mock.patch('asyncio.base_events.logger')
1862    def test_accept_connection_exception(self, m_log):
1863        sock = mock.Mock()
1864        sock.fileno.return_value = 10
1865        sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
1866        self.loop._remove_reader = mock.Mock()
1867        self.loop.call_later = mock.Mock()
1868
1869        self.loop._accept_connection(MyProto, sock)
1870        self.assertTrue(m_log.error.called)
1871        self.assertFalse(sock.close.called)
1872        self.loop._remove_reader.assert_called_with(10)
1873        self.loop.call_later.assert_called_with(
1874            constants.ACCEPT_RETRY_DELAY,
1875            # self.loop._start_serving
1876            mock.ANY,
1877            MyProto, sock, None, None, mock.ANY, mock.ANY)
1878
1879    def test_call_coroutine(self):
1880        with self.assertWarns(DeprecationWarning):
1881            @asyncio.coroutine
1882            def simple_coroutine():
1883                pass
1884
1885        self.loop.set_debug(True)
1886        coro_func = simple_coroutine
1887        coro_obj = coro_func()
1888        self.addCleanup(coro_obj.close)
1889        for func in (coro_func, coro_obj):
1890            with self.assertRaises(TypeError):
1891                self.loop.call_soon(func)
1892            with self.assertRaises(TypeError):
1893                self.loop.call_soon_threadsafe(func)
1894            with self.assertRaises(TypeError):
1895                self.loop.call_later(60, func)
1896            with self.assertRaises(TypeError):
1897                self.loop.call_at(self.loop.time() + 60, func)
1898            with self.assertRaises(TypeError):
1899                self.loop.run_until_complete(
1900                    self.loop.run_in_executor(None, func))
1901
1902    @mock.patch('asyncio.base_events.logger')
1903    def test_log_slow_callbacks(self, m_logger):
1904        def stop_loop_cb(loop):
1905            loop.stop()
1906
1907        async def stop_loop_coro(loop):
1908            loop.stop()
1909
1910        asyncio.set_event_loop(self.loop)
1911        self.loop.set_debug(True)
1912        self.loop.slow_callback_duration = 0.0
1913
1914        # slow callback
1915        self.loop.call_soon(stop_loop_cb, self.loop)
1916        self.loop.run_forever()
1917        fmt, *args = m_logger.warning.call_args[0]
1918        self.assertRegex(fmt % tuple(args),
1919                         "^Executing <Handle.*stop_loop_cb.*> "
1920                         "took .* seconds$")
1921
1922        # slow task
1923        asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop)
1924        self.loop.run_forever()
1925        fmt, *args = m_logger.warning.call_args[0]
1926        self.assertRegex(fmt % tuple(args),
1927                         "^Executing <Task.*stop_loop_coro.*> "
1928                         "took .* seconds$")
1929
1930
1931class RunningLoopTests(unittest.TestCase):
1932
1933    def test_running_loop_within_a_loop(self):
1934        async def runner(loop):
1935            loop.run_forever()
1936
1937        loop = asyncio.new_event_loop()
1938        outer_loop = asyncio.new_event_loop()
1939        try:
1940            with self.assertRaisesRegex(RuntimeError,
1941                                        'while another loop is running'):
1942                outer_loop.run_until_complete(runner(loop))
1943        finally:
1944            loop.close()
1945            outer_loop.close()
1946
1947
1948class BaseLoopSockSendfileTests(test_utils.TestCase):
1949
1950    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
1951
1952    class MyProto(asyncio.Protocol):
1953
1954        def __init__(self, loop):
1955            self.started = False
1956            self.closed = False
1957            self.data = bytearray()
1958            self.fut = loop.create_future()
1959            self.transport = None
1960
1961        def connection_made(self, transport):
1962            self.started = True
1963            self.transport = transport
1964
1965        def data_received(self, data):
1966            self.data.extend(data)
1967
1968        def connection_lost(self, exc):
1969            self.closed = True
1970            self.fut.set_result(None)
1971            self.transport = None
1972
1973        async def wait_closed(self):
1974            await self.fut
1975
1976    @classmethod
1977    def setUpClass(cls):
1978        cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE
1979        constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16
1980        with open(support.TESTFN, 'wb') as fp:
1981            fp.write(cls.DATA)
1982        super().setUpClass()
1983
1984    @classmethod
1985    def tearDownClass(cls):
1986        constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize
1987        support.unlink(support.TESTFN)
1988        super().tearDownClass()
1989
1990    def setUp(self):
1991        from asyncio.selector_events import BaseSelectorEventLoop
1992        # BaseSelectorEventLoop() has no native implementation
1993        self.loop = BaseSelectorEventLoop()
1994        self.set_event_loop(self.loop)
1995        self.file = open(support.TESTFN, 'rb')
1996        self.addCleanup(self.file.close)
1997        super().setUp()
1998
1999    def make_socket(self, blocking=False):
2000        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2001        sock.setblocking(blocking)
2002        self.addCleanup(sock.close)
2003        return sock
2004
2005    def run_loop(self, coro):
2006        return self.loop.run_until_complete(coro)
2007
2008    def prepare(self):
2009        sock = self.make_socket()
2010        proto = self.MyProto(self.loop)
2011        server = self.run_loop(self.loop.create_server(
2012            lambda: proto, support.HOST, 0, family=socket.AF_INET))
2013        addr = server.sockets[0].getsockname()
2014
2015        for _ in range(10):
2016            try:
2017                self.run_loop(self.loop.sock_connect(sock, addr))
2018            except OSError:
2019                self.run_loop(asyncio.sleep(0.5))
2020                continue
2021            else:
2022                break
2023        else:
2024            # One last try, so we get the exception
2025            self.run_loop(self.loop.sock_connect(sock, addr))
2026
2027        def cleanup():
2028            server.close()
2029            self.run_loop(server.wait_closed())
2030            sock.close()
2031            if proto.transport is not None:
2032                proto.transport.close()
2033                self.run_loop(proto.wait_closed())
2034
2035        self.addCleanup(cleanup)
2036
2037        return sock, proto
2038
2039    def test__sock_sendfile_native_failure(self):
2040        sock, proto = self.prepare()
2041
2042        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
2043                                    "sendfile is not available"):
2044            self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
2045                                                          0, None))
2046
2047        self.assertEqual(proto.data, b'')
2048        self.assertEqual(self.file.tell(), 0)
2049
2050    def test_sock_sendfile_no_fallback(self):
2051        sock, proto = self.prepare()
2052
2053        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
2054                                    "sendfile is not available"):
2055            self.run_loop(self.loop.sock_sendfile(sock, self.file,
2056                                                  fallback=False))
2057
2058        self.assertEqual(self.file.tell(), 0)
2059        self.assertEqual(proto.data, b'')
2060
2061    def test_sock_sendfile_fallback(self):
2062        sock, proto = self.prepare()
2063
2064        ret = self.run_loop(self.loop.sock_sendfile(sock, self.file))
2065        sock.close()
2066        self.run_loop(proto.wait_closed())
2067
2068        self.assertEqual(ret, len(self.DATA))
2069        self.assertEqual(self.file.tell(), len(self.DATA))
2070        self.assertEqual(proto.data, self.DATA)
2071
2072    def test_sock_sendfile_fallback_offset_and_count(self):
2073        sock, proto = self.prepare()
2074
2075        ret = self.run_loop(self.loop.sock_sendfile(sock, self.file,
2076                                                    1000, 2000))
2077        sock.close()
2078        self.run_loop(proto.wait_closed())
2079
2080        self.assertEqual(ret, 2000)
2081        self.assertEqual(self.file.tell(), 3000)
2082        self.assertEqual(proto.data, self.DATA[1000:3000])
2083
2084    def test_blocking_socket(self):
2085        self.loop.set_debug(True)
2086        sock = self.make_socket(blocking=True)
2087        with self.assertRaisesRegex(ValueError, "must be non-blocking"):
2088            self.run_loop(self.loop.sock_sendfile(sock, self.file))
2089
2090    def test_nonbinary_file(self):
2091        sock = self.make_socket()
2092        with open(support.TESTFN, 'r') as f:
2093            with self.assertRaisesRegex(ValueError, "binary mode"):
2094                self.run_loop(self.loop.sock_sendfile(sock, f))
2095
2096    def test_nonstream_socket(self):
2097        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
2098        sock.setblocking(False)
2099        self.addCleanup(sock.close)
2100        with self.assertRaisesRegex(ValueError, "only SOCK_STREAM type"):
2101            self.run_loop(self.loop.sock_sendfile(sock, self.file))
2102
2103    def test_notint_count(self):
2104        sock = self.make_socket()
2105        with self.assertRaisesRegex(TypeError,
2106                                    "count must be a positive integer"):
2107            self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, 'count'))
2108
2109    def test_negative_count(self):
2110        sock = self.make_socket()
2111        with self.assertRaisesRegex(ValueError,
2112                                    "count must be a positive integer"):
2113            self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, -1))
2114
2115    def test_notint_offset(self):
2116        sock = self.make_socket()
2117        with self.assertRaisesRegex(TypeError,
2118                                    "offset must be a non-negative integer"):
2119            self.run_loop(self.loop.sock_sendfile(sock, self.file, 'offset'))
2120
2121    def test_negative_offset(self):
2122        sock = self.make_socket()
2123        with self.assertRaisesRegex(ValueError,
2124                                    "offset must be a non-negative integer"):
2125            self.run_loop(self.loop.sock_sendfile(sock, self.file, -1))
2126
2127
2128class TestSelectorUtils(test_utils.TestCase):
2129    def check_set_nodelay(self, sock):
2130        opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
2131        self.assertFalse(opt)
2132
2133        base_events._set_nodelay(sock)
2134
2135        opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
2136        self.assertTrue(opt)
2137
2138    @unittest.skipUnless(hasattr(socket, 'TCP_NODELAY'),
2139                         'need socket.TCP_NODELAY')
2140    def test_set_nodelay(self):
2141        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM,
2142                             proto=socket.IPPROTO_TCP)
2143        with sock:
2144            self.check_set_nodelay(sock)
2145
2146        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM,
2147                             proto=socket.IPPROTO_TCP)
2148        with sock:
2149            sock.setblocking(False)
2150            self.check_set_nodelay(sock)
2151
2152
2153
2154if __name__ == '__main__':
2155    unittest.main()
2156