• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for base_events.py"""
2
3import errno
4import logging
5import math
6import os
7import socket
8import sys
9import threading
10import time
11import unittest
12from unittest import mock
13
14import asyncio
15from asyncio import base_events
16from asyncio import constants
17from asyncio import test_utils
18try:
19    from test import support
20except ImportError:
21    from asyncio import test_support as support
22try:
23    from test.support.script_helper import assert_python_ok
24except ImportError:
25    try:
26        from test.script_helper import assert_python_ok
27    except ImportError:
28        from asyncio.test_support import assert_python_ok
29
30
31MOCK_ANY = mock.ANY
32PY34 = sys.version_info >= (3, 4)
33
34
35def mock_socket_module():
36    m_socket = mock.MagicMock(spec=socket)
37    for name in (
38        'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP',
39        'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton'
40    ):
41        if hasattr(socket, name):
42            setattr(m_socket, name, getattr(socket, name))
43        else:
44            delattr(m_socket, name)
45
46    m_socket.socket = mock.MagicMock()
47    m_socket.socket.return_value = test_utils.mock_nonblocking_socket()
48    m_socket.getaddrinfo._is_coroutine = False
49
50    return m_socket
51
52
53def patch_socket(f):
54    return mock.patch('asyncio.base_events.socket',
55                      new_callable=mock_socket_module)(f)
56
57
58class BaseEventTests(test_utils.TestCase):
59
60    def test_ipaddr_info(self):
61        UNSPEC = socket.AF_UNSPEC
62        INET = socket.AF_INET
63        INET6 = socket.AF_INET6
64        STREAM = socket.SOCK_STREAM
65        DGRAM = socket.SOCK_DGRAM
66        TCP = socket.IPPROTO_TCP
67        UDP = socket.IPPROTO_UDP
68
69        self.assertEqual(
70            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
71            base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP))
72
73        self.assertEqual(
74            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
75            base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP))
76
77        self.assertEqual(
78            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
79            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP))
80
81        self.assertEqual(
82            (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
83            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP))
84
85        # Socket type STREAM implies TCP protocol.
86        self.assertEqual(
87            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
88            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0))
89
90        # Socket type DGRAM implies UDP protocol.
91        self.assertEqual(
92            (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
93            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0))
94
95        # No socket type.
96        self.assertIsNone(
97            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0))
98
99        # IPv4 address with family IPv6.
100        self.assertIsNone(
101            base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP))
102
103        self.assertEqual(
104            (INET6, STREAM, TCP, '', ('::3', 1)),
105            base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP))
106
107        self.assertEqual(
108            (INET6, STREAM, TCP, '', ('::3', 1)),
109            base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP))
110
111        # IPv6 address with family IPv4.
112        self.assertIsNone(
113            base_events._ipaddr_info('::3', 1, INET, STREAM, TCP))
114
115        # IPv6 address with zone index.
116        self.assertIsNone(
117            base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP))
118
119        if hasattr(socket, 'SOCK_NONBLOCK'):
120            self.assertEqual(
121                None,
122                base_events._ipaddr_info(
123                    '1.2.3.4', 1, INET, STREAM | socket.SOCK_NONBLOCK, TCP))
124
125
126    def test_port_parameter_types(self):
127        # Test obscure kinds of arguments for "port".
128        INET = socket.AF_INET
129        STREAM = socket.SOCK_STREAM
130        TCP = socket.IPPROTO_TCP
131
132        self.assertEqual(
133            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
134            base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP))
135
136        self.assertEqual(
137            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
138            base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP))
139
140        self.assertEqual(
141            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
142            base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP))
143
144        self.assertEqual(
145            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
146            base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP))
147
148        self.assertEqual(
149            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
150            base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP))
151
152    @patch_socket
153    def test_ipaddr_info_no_inet_pton(self, m_socket):
154        del m_socket.inet_pton
155        self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1,
156                                                   socket.AF_INET,
157                                                   socket.SOCK_STREAM,
158                                                   socket.IPPROTO_TCP))
159
160
161class BaseEventLoopTests(test_utils.TestCase):
162
163    def setUp(self):
164        super().setUp()
165        self.loop = base_events.BaseEventLoop()
166        self.loop._selector = mock.Mock()
167        self.loop._selector.select.return_value = ()
168        self.set_event_loop(self.loop)
169
170    def test_not_implemented(self):
171        m = mock.Mock()
172        self.assertRaises(
173            NotImplementedError,
174            self.loop._make_socket_transport, m, m)
175        self.assertRaises(
176            NotImplementedError,
177            self.loop._make_ssl_transport, m, m, m, m)
178        self.assertRaises(
179            NotImplementedError,
180            self.loop._make_datagram_transport, m, m)
181        self.assertRaises(
182            NotImplementedError, self.loop._process_events, [])
183        self.assertRaises(
184            NotImplementedError, self.loop._write_to_self)
185        self.assertRaises(
186            NotImplementedError,
187            self.loop._make_read_pipe_transport, m, m)
188        self.assertRaises(
189            NotImplementedError,
190            self.loop._make_write_pipe_transport, m, m)
191        gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m)
192        with self.assertRaises(NotImplementedError):
193            gen.send(None)
194
195    def test_close(self):
196        self.assertFalse(self.loop.is_closed())
197        self.loop.close()
198        self.assertTrue(self.loop.is_closed())
199
200        # it should be possible to call close() more than once
201        self.loop.close()
202        self.loop.close()
203
204        # operation blocked when the loop is closed
205        f = asyncio.Future(loop=self.loop)
206        self.assertRaises(RuntimeError, self.loop.run_forever)
207        self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
208
209    def test__add_callback_handle(self):
210        h = asyncio.Handle(lambda: False, (), self.loop)
211
212        self.loop._add_callback(h)
213        self.assertFalse(self.loop._scheduled)
214        self.assertIn(h, self.loop._ready)
215
216    def test__add_callback_cancelled_handle(self):
217        h = asyncio.Handle(lambda: False, (), self.loop)
218        h.cancel()
219
220        self.loop._add_callback(h)
221        self.assertFalse(self.loop._scheduled)
222        self.assertFalse(self.loop._ready)
223
224    def test_set_default_executor(self):
225        executor = mock.Mock()
226        self.loop.set_default_executor(executor)
227        self.assertIs(executor, self.loop._default_executor)
228
229    def test_getnameinfo(self):
230        sockaddr = mock.Mock()
231        self.loop.run_in_executor = mock.Mock()
232        self.loop.getnameinfo(sockaddr)
233        self.assertEqual(
234            (None, socket.getnameinfo, sockaddr, 0),
235            self.loop.run_in_executor.call_args[0])
236
237    def test_call_soon(self):
238        def cb():
239            pass
240
241        h = self.loop.call_soon(cb)
242        self.assertEqual(h._callback, cb)
243        self.assertIsInstance(h, asyncio.Handle)
244        self.assertIn(h, self.loop._ready)
245
246    def test_call_soon_non_callable(self):
247        self.loop.set_debug(True)
248        with self.assertRaisesRegex(TypeError, 'a callable object'):
249            self.loop.call_soon(1)
250
251    def test_call_later(self):
252        def cb():
253            pass
254
255        h = self.loop.call_later(10.0, cb)
256        self.assertIsInstance(h, asyncio.TimerHandle)
257        self.assertIn(h, self.loop._scheduled)
258        self.assertNotIn(h, self.loop._ready)
259
260    def test_call_later_negative_delays(self):
261        calls = []
262
263        def cb(arg):
264            calls.append(arg)
265
266        self.loop._process_events = mock.Mock()
267        self.loop.call_later(-1, cb, 'a')
268        self.loop.call_later(-2, cb, 'b')
269        test_utils.run_briefly(self.loop)
270        self.assertEqual(calls, ['b', 'a'])
271
272    def test_time_and_call_at(self):
273        def cb():
274            self.loop.stop()
275
276        self.loop._process_events = mock.Mock()
277        delay = 0.1
278
279        when = self.loop.time() + delay
280        self.loop.call_at(when, cb)
281        t0 = self.loop.time()
282        self.loop.run_forever()
283        dt = self.loop.time() - t0
284
285        # 50 ms: maximum granularity of the event loop
286        self.assertGreaterEqual(dt, delay - 0.050, dt)
287        # tolerate a difference of +800 ms because some Python buildbots
288        # are really slow
289        self.assertLessEqual(dt, 0.9, dt)
290
291    def check_thread(self, loop, debug):
292        def cb():
293            pass
294
295        loop.set_debug(debug)
296        if debug:
297            msg = ("Non-thread-safe operation invoked on an event loop other "
298                  "than the current one")
299            with self.assertRaisesRegex(RuntimeError, msg):
300                loop.call_soon(cb)
301            with self.assertRaisesRegex(RuntimeError, msg):
302                loop.call_later(60, cb)
303            with self.assertRaisesRegex(RuntimeError, msg):
304                loop.call_at(loop.time() + 60, cb)
305        else:
306            loop.call_soon(cb)
307            loop.call_later(60, cb)
308            loop.call_at(loop.time() + 60, cb)
309
310    def test_check_thread(self):
311        def check_in_thread(loop, event, debug, create_loop, fut):
312            # wait until the event loop is running
313            event.wait()
314
315            try:
316                if create_loop:
317                    loop2 = base_events.BaseEventLoop()
318                    try:
319                        asyncio.set_event_loop(loop2)
320                        self.check_thread(loop, debug)
321                    finally:
322                        asyncio.set_event_loop(None)
323                        loop2.close()
324                else:
325                    self.check_thread(loop, debug)
326            except Exception as exc:
327                loop.call_soon_threadsafe(fut.set_exception, exc)
328            else:
329                loop.call_soon_threadsafe(fut.set_result, None)
330
331        def test_thread(loop, debug, create_loop=False):
332            event = threading.Event()
333            fut = asyncio.Future(loop=loop)
334            loop.call_soon(event.set)
335            args = (loop, event, debug, create_loop, fut)
336            thread = threading.Thread(target=check_in_thread, args=args)
337            thread.start()
338            loop.run_until_complete(fut)
339            thread.join()
340
341        self.loop._process_events = mock.Mock()
342        self.loop._write_to_self = mock.Mock()
343
344        # raise RuntimeError if the thread has no event loop
345        test_thread(self.loop, True)
346
347        # check disabled if debug mode is disabled
348        test_thread(self.loop, False)
349
350        # raise RuntimeError if the event loop of the thread is not the called
351        # event loop
352        test_thread(self.loop, True, create_loop=True)
353
354        # check disabled if debug mode is disabled
355        test_thread(self.loop, False, create_loop=True)
356
357    def test_run_once_in_executor_plain(self):
358        def cb():
359            pass
360        f = asyncio.Future(loop=self.loop)
361        executor = mock.Mock()
362        executor.submit.return_value = f
363
364        self.loop.set_default_executor(executor)
365
366        res = self.loop.run_in_executor(None, cb)
367        self.assertIs(f, res)
368
369        executor = mock.Mock()
370        executor.submit.return_value = f
371        res = self.loop.run_in_executor(executor, cb)
372        self.assertIs(f, res)
373        self.assertTrue(executor.submit.called)
374
375        f.cancel()  # Don't complain about abandoned Future.
376
377    def test__run_once(self):
378        h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (),
379                                 self.loop)
380        h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (),
381                                 self.loop)
382
383        h1.cancel()
384
385        self.loop._process_events = mock.Mock()
386        self.loop._scheduled.append(h1)
387        self.loop._scheduled.append(h2)
388        self.loop._run_once()
389
390        t = self.loop._selector.select.call_args[0][0]
391        self.assertTrue(9.5 < t < 10.5, t)
392        self.assertEqual([h2], self.loop._scheduled)
393        self.assertTrue(self.loop._process_events.called)
394
395    def test_set_debug(self):
396        self.loop.set_debug(True)
397        self.assertTrue(self.loop.get_debug())
398        self.loop.set_debug(False)
399        self.assertFalse(self.loop.get_debug())
400
401    @mock.patch('asyncio.base_events.logger')
402    def test__run_once_logging(self, m_logger):
403        def slow_select(timeout):
404            # Sleep a bit longer than a second to avoid timer resolution
405            # issues.
406            time.sleep(1.1)
407            return []
408
409        # logging needs debug flag
410        self.loop.set_debug(True)
411
412        # Log to INFO level if timeout > 1.0 sec.
413        self.loop._selector.select = slow_select
414        self.loop._process_events = mock.Mock()
415        self.loop._run_once()
416        self.assertEqual(logging.INFO, m_logger.log.call_args[0][0])
417
418        def fast_select(timeout):
419            time.sleep(0.001)
420            return []
421
422        self.loop._selector.select = fast_select
423        self.loop._run_once()
424        self.assertEqual(logging.DEBUG, m_logger.log.call_args[0][0])
425
426    def test__run_once_schedule_handle(self):
427        handle = None
428        processed = False
429
430        def cb(loop):
431            nonlocal processed, handle
432            processed = True
433            handle = loop.call_soon(lambda: True)
434
435        h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
436                                self.loop)
437
438        self.loop._process_events = mock.Mock()
439        self.loop._scheduled.append(h)
440        self.loop._run_once()
441
442        self.assertTrue(processed)
443        self.assertEqual([handle], list(self.loop._ready))
444
445    def test__run_once_cancelled_event_cleanup(self):
446        self.loop._process_events = mock.Mock()
447
448        self.assertTrue(
449            0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0)
450
451        def cb():
452            pass
453
454        # Set up one "blocking" event that will not be cancelled to
455        # ensure later cancelled events do not make it to the head
456        # of the queue and get cleaned.
457        not_cancelled_count = 1
458        self.loop.call_later(3000, cb)
459
460        # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES)
461        # cancelled handles, ensure they aren't removed
462
463        cancelled_count = 2
464        for x in range(2):
465            h = self.loop.call_later(3600, cb)
466            h.cancel()
467
468        # Add some cancelled events that will be at head and removed
469        cancelled_count += 2
470        for x in range(2):
471            h = self.loop.call_later(100, cb)
472            h.cancel()
473
474        # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low
475        self.assertLessEqual(cancelled_count + not_cancelled_count,
476            base_events._MIN_SCHEDULED_TIMER_HANDLES)
477
478        self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
479
480        self.loop._run_once()
481
482        cancelled_count -= 2
483
484        self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
485
486        self.assertEqual(len(self.loop._scheduled),
487            cancelled_count + not_cancelled_count)
488
489        # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION
490        # so that deletion of cancelled events will occur on next _run_once
491        add_cancel_count = int(math.ceil(
492            base_events._MIN_SCHEDULED_TIMER_HANDLES *
493            base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1
494
495        add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES -
496            add_cancel_count, 0)
497
498        # Add some events that will not be cancelled
499        not_cancelled_count += add_not_cancel_count
500        for x in range(add_not_cancel_count):
501            self.loop.call_later(3600, cb)
502
503        # Add enough cancelled events
504        cancelled_count += add_cancel_count
505        for x in range(add_cancel_count):
506            h = self.loop.call_later(3600, cb)
507            h.cancel()
508
509        # Ensure all handles are still scheduled
510        self.assertEqual(len(self.loop._scheduled),
511            cancelled_count + not_cancelled_count)
512
513        self.loop._run_once()
514
515        # Ensure cancelled events were removed
516        self.assertEqual(len(self.loop._scheduled), not_cancelled_count)
517
518        # Ensure only uncancelled events remain scheduled
519        self.assertTrue(all([not x._cancelled for x in self.loop._scheduled]))
520
521    def test_run_until_complete_type_error(self):
522        self.assertRaises(TypeError,
523            self.loop.run_until_complete, 'blah')
524
525    def test_run_until_complete_loop(self):
526        task = asyncio.Future(loop=self.loop)
527        other_loop = self.new_test_loop()
528        self.addCleanup(other_loop.close)
529        self.assertRaises(ValueError,
530            other_loop.run_until_complete, task)
531
532    def test_subprocess_exec_invalid_args(self):
533        args = [sys.executable, '-c', 'pass']
534
535        # missing program parameter (empty args)
536        self.assertRaises(TypeError,
537            self.loop.run_until_complete, self.loop.subprocess_exec,
538            asyncio.SubprocessProtocol)
539
540        # expected multiple arguments, not a list
541        self.assertRaises(TypeError,
542            self.loop.run_until_complete, self.loop.subprocess_exec,
543            asyncio.SubprocessProtocol, args)
544
545        # program arguments must be strings, not int
546        self.assertRaises(TypeError,
547            self.loop.run_until_complete, self.loop.subprocess_exec,
548            asyncio.SubprocessProtocol, sys.executable, 123)
549
550        # universal_newlines, shell, bufsize must not be set
551        self.assertRaises(TypeError,
552        self.loop.run_until_complete, self.loop.subprocess_exec,
553            asyncio.SubprocessProtocol, *args, universal_newlines=True)
554        self.assertRaises(TypeError,
555            self.loop.run_until_complete, self.loop.subprocess_exec,
556            asyncio.SubprocessProtocol, *args, shell=True)
557        self.assertRaises(TypeError,
558            self.loop.run_until_complete, self.loop.subprocess_exec,
559            asyncio.SubprocessProtocol, *args, bufsize=4096)
560
561    def test_subprocess_shell_invalid_args(self):
562        # expected a string, not an int or a list
563        self.assertRaises(TypeError,
564            self.loop.run_until_complete, self.loop.subprocess_shell,
565            asyncio.SubprocessProtocol, 123)
566        self.assertRaises(TypeError,
567            self.loop.run_until_complete, self.loop.subprocess_shell,
568            asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass'])
569
570        # universal_newlines, shell, bufsize must not be set
571        self.assertRaises(TypeError,
572            self.loop.run_until_complete, self.loop.subprocess_shell,
573            asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True)
574        self.assertRaises(TypeError,
575            self.loop.run_until_complete, self.loop.subprocess_shell,
576            asyncio.SubprocessProtocol, 'exit 0', shell=True)
577        self.assertRaises(TypeError,
578            self.loop.run_until_complete, self.loop.subprocess_shell,
579            asyncio.SubprocessProtocol, 'exit 0', bufsize=4096)
580
581    def test_default_exc_handler_callback(self):
582        self.loop._process_events = mock.Mock()
583
584        def zero_error(fut):
585            fut.set_result(True)
586            1/0
587
588        # Test call_soon (events.Handle)
589        with mock.patch('asyncio.base_events.logger') as log:
590            fut = asyncio.Future(loop=self.loop)
591            self.loop.call_soon(zero_error, fut)
592            fut.add_done_callback(lambda fut: self.loop.stop())
593            self.loop.run_forever()
594            log.error.assert_called_with(
595                test_utils.MockPattern('Exception in callback.*zero'),
596                exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
597
598        # Test call_later (events.TimerHandle)
599        with mock.patch('asyncio.base_events.logger') as log:
600            fut = asyncio.Future(loop=self.loop)
601            self.loop.call_later(0.01, zero_error, fut)
602            fut.add_done_callback(lambda fut: self.loop.stop())
603            self.loop.run_forever()
604            log.error.assert_called_with(
605                test_utils.MockPattern('Exception in callback.*zero'),
606                exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
607
608    def test_default_exc_handler_coro(self):
609        self.loop._process_events = mock.Mock()
610
611        @asyncio.coroutine
612        def zero_error_coro():
613            yield from asyncio.sleep(0.01, loop=self.loop)
614            1/0
615
616        # Test Future.__del__
617        with mock.patch('asyncio.base_events.logger') as log:
618            fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop)
619            fut.add_done_callback(lambda *args: self.loop.stop())
620            self.loop.run_forever()
621            fut = None # Trigger Future.__del__ or futures._TracebackLogger
622            support.gc_collect()
623            if PY34:
624                # Future.__del__ in Python 3.4 logs error with
625                # an actual exception context
626                log.error.assert_called_with(
627                    test_utils.MockPattern('.*exception was never retrieved'),
628                    exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
629            else:
630                # futures._TracebackLogger logs only textual traceback
631                log.error.assert_called_with(
632                    test_utils.MockPattern(
633                        '.*exception was never retrieved.*ZeroDiv'),
634                    exc_info=False)
635
636    def test_set_exc_handler_invalid(self):
637        with self.assertRaisesRegex(TypeError, 'A callable object or None'):
638            self.loop.set_exception_handler('spam')
639
640    def test_set_exc_handler_custom(self):
641        def zero_error():
642            1/0
643
644        def run_loop():
645            handle = self.loop.call_soon(zero_error)
646            self.loop._run_once()
647            return handle
648
649        self.loop.set_debug(True)
650        self.loop._process_events = mock.Mock()
651
652        self.assertIsNone(self.loop.get_exception_handler())
653        mock_handler = mock.Mock()
654        self.loop.set_exception_handler(mock_handler)
655        self.assertIs(self.loop.get_exception_handler(), mock_handler)
656        handle = run_loop()
657        mock_handler.assert_called_with(self.loop, {
658            'exception': MOCK_ANY,
659            'message': test_utils.MockPattern(
660                                'Exception in callback.*zero_error'),
661            'handle': handle,
662            'source_traceback': handle._source_traceback,
663        })
664        mock_handler.reset_mock()
665
666        self.loop.set_exception_handler(None)
667        with mock.patch('asyncio.base_events.logger') as log:
668            run_loop()
669            log.error.assert_called_with(
670                        test_utils.MockPattern(
671                                'Exception in callback.*zero'),
672                        exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
673
674        assert not mock_handler.called
675
676    def test_set_exc_handler_broken(self):
677        def run_loop():
678            def zero_error():
679                1/0
680            self.loop.call_soon(zero_error)
681            self.loop._run_once()
682
683        def handler(loop, context):
684            raise AttributeError('spam')
685
686        self.loop._process_events = mock.Mock()
687
688        self.loop.set_exception_handler(handler)
689
690        with mock.patch('asyncio.base_events.logger') as log:
691            run_loop()
692            log.error.assert_called_with(
693                test_utils.MockPattern(
694                    'Unhandled error in exception handler'),
695                exc_info=(AttributeError, MOCK_ANY, MOCK_ANY))
696
697    def test_default_exc_handler_broken(self):
698        _context = None
699
700        class Loop(base_events.BaseEventLoop):
701
702            _selector = mock.Mock()
703            _process_events = mock.Mock()
704
705            def default_exception_handler(self, context):
706                nonlocal _context
707                _context = context
708                # Simulates custom buggy "default_exception_handler"
709                raise ValueError('spam')
710
711        loop = Loop()
712        self.addCleanup(loop.close)
713        asyncio.set_event_loop(loop)
714
715        def run_loop():
716            def zero_error():
717                1/0
718            loop.call_soon(zero_error)
719            loop._run_once()
720
721        with mock.patch('asyncio.base_events.logger') as log:
722            run_loop()
723            log.error.assert_called_with(
724                'Exception in default exception handler',
725                exc_info=True)
726
727        def custom_handler(loop, context):
728            raise ValueError('ham')
729
730        _context = None
731        loop.set_exception_handler(custom_handler)
732        with mock.patch('asyncio.base_events.logger') as log:
733            run_loop()
734            log.error.assert_called_with(
735                test_utils.MockPattern('Exception in default exception.*'
736                                       'while handling.*in custom'),
737                exc_info=True)
738
739            # Check that original context was passed to default
740            # exception handler.
741            self.assertIn('context', _context)
742            self.assertIs(type(_context['context']['exception']),
743                          ZeroDivisionError)
744
745    def test_set_task_factory_invalid(self):
746        with self.assertRaisesRegex(
747            TypeError, 'task factory must be a callable or None'):
748
749            self.loop.set_task_factory(1)
750
751        self.assertIsNone(self.loop.get_task_factory())
752
753    def test_set_task_factory(self):
754        self.loop._process_events = mock.Mock()
755
756        class MyTask(asyncio.Task):
757            pass
758
759        @asyncio.coroutine
760        def coro():
761            pass
762
763        factory = lambda loop, coro: MyTask(coro, loop=loop)
764
765        self.assertIsNone(self.loop.get_task_factory())
766        self.loop.set_task_factory(factory)
767        self.assertIs(self.loop.get_task_factory(), factory)
768
769        task = self.loop.create_task(coro())
770        self.assertTrue(isinstance(task, MyTask))
771        self.loop.run_until_complete(task)
772
773        self.loop.set_task_factory(None)
774        self.assertIsNone(self.loop.get_task_factory())
775
776        task = self.loop.create_task(coro())
777        self.assertTrue(isinstance(task, asyncio.Task))
778        self.assertFalse(isinstance(task, MyTask))
779        self.loop.run_until_complete(task)
780
781    def test_env_var_debug(self):
782        code = '\n'.join((
783            'import asyncio',
784            'loop = asyncio.get_event_loop()',
785            'print(loop.get_debug())'))
786
787        # Test with -E to not fail if the unit test was run with
788        # PYTHONASYNCIODEBUG set to a non-empty string
789        sts, stdout, stderr = assert_python_ok('-E', '-c', code)
790        self.assertEqual(stdout.rstrip(), b'False')
791
792        sts, stdout, stderr = assert_python_ok('-c', code,
793                                               PYTHONASYNCIODEBUG='')
794        self.assertEqual(stdout.rstrip(), b'False')
795
796        sts, stdout, stderr = assert_python_ok('-c', code,
797                                               PYTHONASYNCIODEBUG='1')
798        self.assertEqual(stdout.rstrip(), b'True')
799
800        sts, stdout, stderr = assert_python_ok('-E', '-c', code,
801                                               PYTHONASYNCIODEBUG='1')
802        self.assertEqual(stdout.rstrip(), b'False')
803
804    def test_create_task(self):
805        class MyTask(asyncio.Task):
806            pass
807
808        @asyncio.coroutine
809        def test():
810            pass
811
812        class EventLoop(base_events.BaseEventLoop):
813            def create_task(self, coro):
814                return MyTask(coro, loop=loop)
815
816        loop = EventLoop()
817        self.set_event_loop(loop)
818
819        coro = test()
820        task = asyncio.ensure_future(coro, loop=loop)
821        self.assertIsInstance(task, MyTask)
822
823        # make warnings quiet
824        task._log_destroy_pending = False
825        coro.close()
826
827    def test_run_forever_keyboard_interrupt(self):
828        # Python issue #22601: ensure that the temporary task created by
829        # run_forever() consumes the KeyboardInterrupt and so don't log
830        # a warning
831        @asyncio.coroutine
832        def raise_keyboard_interrupt():
833            raise KeyboardInterrupt
834
835        self.loop._process_events = mock.Mock()
836        self.loop.call_exception_handler = mock.Mock()
837
838        try:
839            self.loop.run_until_complete(raise_keyboard_interrupt())
840        except KeyboardInterrupt:
841            pass
842        self.loop.close()
843        support.gc_collect()
844
845        self.assertFalse(self.loop.call_exception_handler.called)
846
847    def test_run_until_complete_baseexception(self):
848        # Python issue #22429: run_until_complete() must not schedule a pending
849        # call to stop() if the future raised a BaseException
850        @asyncio.coroutine
851        def raise_keyboard_interrupt():
852            raise KeyboardInterrupt
853
854        self.loop._process_events = mock.Mock()
855
856        try:
857            self.loop.run_until_complete(raise_keyboard_interrupt())
858        except KeyboardInterrupt:
859            pass
860
861        def func():
862            self.loop.stop()
863            func.called = True
864        func.called = False
865        try:
866            self.loop.call_soon(func)
867            self.loop.run_forever()
868        except KeyboardInterrupt:
869            pass
870        self.assertTrue(func.called)
871
872    def test_single_selecter_event_callback_after_stopping(self):
873        # Python issue #25593: A stopped event loop may cause event callbacks
874        # to run more than once.
875        event_sentinel = object()
876        callcount = 0
877        doer = None
878
879        def proc_events(event_list):
880            nonlocal doer
881            if event_sentinel in event_list:
882                doer = self.loop.call_soon(do_event)
883
884        def do_event():
885            nonlocal callcount
886            callcount += 1
887            self.loop.call_soon(clear_selector)
888
889        def clear_selector():
890            doer.cancel()
891            self.loop._selector.select.return_value = ()
892
893        self.loop._process_events = proc_events
894        self.loop._selector.select.return_value = (event_sentinel,)
895
896        for i in range(1, 3):
897            with self.subTest('Loop %d/2' % i):
898                self.loop.call_soon(self.loop.stop)
899                self.loop.run_forever()
900                self.assertEqual(callcount, 1)
901
902    def test_run_once(self):
903        # Simple test for test_utils.run_once().  It may seem strange
904        # to have a test for this (the function isn't even used!) but
905        # it's a de-factor standard API for library tests.  This tests
906        # the idiom: loop.call_soon(loop.stop); loop.run_forever().
907        count = 0
908
909        def callback():
910            nonlocal count
911            count += 1
912
913        self.loop._process_events = mock.Mock()
914        self.loop.call_soon(callback)
915        test_utils.run_once(self.loop)
916        self.assertEqual(count, 1)
917
918    def test_run_forever_pre_stopped(self):
919        # Test that the old idiom for pre-stopping the loop works.
920        self.loop._process_events = mock.Mock()
921        self.loop.stop()
922        self.loop.run_forever()
923        self.loop._selector.select.assert_called_once_with(0)
924
925
926class MyProto(asyncio.Protocol):
927    done = None
928
929    def __init__(self, create_future=False):
930        self.state = 'INITIAL'
931        self.nbytes = 0
932        if create_future:
933            self.done = asyncio.Future()
934
935    def connection_made(self, transport):
936        self.transport = transport
937        assert self.state == 'INITIAL', self.state
938        self.state = 'CONNECTED'
939        transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
940
941    def data_received(self, data):
942        assert self.state == 'CONNECTED', self.state
943        self.nbytes += len(data)
944
945    def eof_received(self):
946        assert self.state == 'CONNECTED', self.state
947        self.state = 'EOF'
948
949    def connection_lost(self, exc):
950        assert self.state in ('CONNECTED', 'EOF'), self.state
951        self.state = 'CLOSED'
952        if self.done:
953            self.done.set_result(None)
954
955
956class MyDatagramProto(asyncio.DatagramProtocol):
957    done = None
958
959    def __init__(self, create_future=False, loop=None):
960        self.state = 'INITIAL'
961        self.nbytes = 0
962        if create_future:
963            self.done = asyncio.Future(loop=loop)
964
965    def connection_made(self, transport):
966        self.transport = transport
967        assert self.state == 'INITIAL', self.state
968        self.state = 'INITIALIZED'
969
970    def datagram_received(self, data, addr):
971        assert self.state == 'INITIALIZED', self.state
972        self.nbytes += len(data)
973
974    def error_received(self, exc):
975        assert self.state == 'INITIALIZED', self.state
976
977    def connection_lost(self, exc):
978        assert self.state == 'INITIALIZED', self.state
979        self.state = 'CLOSED'
980        if self.done:
981            self.done.set_result(None)
982
983
984class BaseEventLoopWithSelectorTests(test_utils.TestCase):
985
986    def setUp(self):
987        super().setUp()
988        self.loop = asyncio.new_event_loop()
989        self.set_event_loop(self.loop)
990
991    @patch_socket
992    def test_create_connection_multiple_errors(self, m_socket):
993
994        class MyProto(asyncio.Protocol):
995            pass
996
997        @asyncio.coroutine
998        def getaddrinfo(*args, **kw):
999            yield from []
1000            return [(2, 1, 6, '', ('107.6.106.82', 80)),
1001                    (2, 1, 6, '', ('107.6.106.82', 80))]
1002
1003        def getaddrinfo_task(*args, **kwds):
1004            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1005
1006        idx = -1
1007        errors = ['err1', 'err2']
1008
1009        def _socket(*args, **kw):
1010            nonlocal idx, errors
1011            idx += 1
1012            raise OSError(errors[idx])
1013
1014        m_socket.socket = _socket
1015
1016        self.loop.getaddrinfo = getaddrinfo_task
1017
1018        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1019        with self.assertRaises(OSError) as cm:
1020            self.loop.run_until_complete(coro)
1021
1022        self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2')
1023
1024    @patch_socket
1025    def test_create_connection_timeout(self, m_socket):
1026        # Ensure that the socket is closed on timeout
1027        sock = mock.Mock()
1028        m_socket.socket.return_value = sock
1029
1030        def getaddrinfo(*args, **kw):
1031            fut = asyncio.Future(loop=self.loop)
1032            addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '',
1033                    ('127.0.0.1', 80))
1034            fut.set_result([addr])
1035            return fut
1036        self.loop.getaddrinfo = getaddrinfo
1037
1038        with mock.patch.object(self.loop, 'sock_connect',
1039                               side_effect=asyncio.TimeoutError):
1040            coro = self.loop.create_connection(MyProto, '127.0.0.1', 80)
1041            with self.assertRaises(asyncio.TimeoutError):
1042                self.loop.run_until_complete(coro)
1043            self.assertTrue(sock.close.called)
1044
1045    def test_create_connection_host_port_sock(self):
1046        coro = self.loop.create_connection(
1047            MyProto, 'example.com', 80, sock=object())
1048        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1049
1050    def test_create_connection_wrong_sock(self):
1051        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1052        with sock:
1053            coro = self.loop.create_connection(MyProto, sock=sock)
1054            with self.assertRaisesRegex(ValueError,
1055                                        'A Stream Socket was expected'):
1056                self.loop.run_until_complete(coro)
1057
1058    def test_create_server_wrong_sock(self):
1059        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1060        with sock:
1061            coro = self.loop.create_server(MyProto, sock=sock)
1062            with self.assertRaisesRegex(ValueError,
1063                                        'A Stream Socket was expected'):
1064                self.loop.run_until_complete(coro)
1065
1066    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
1067                         'no socket.SOCK_NONBLOCK (linux only)')
1068    def test_create_server_stream_bittype(self):
1069        sock = socket.socket(
1070            socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
1071        with sock:
1072            coro = self.loop.create_server(lambda: None, sock=sock)
1073            srv = self.loop.run_until_complete(coro)
1074            srv.close()
1075            self.loop.run_until_complete(srv.wait_closed())
1076
1077    def test_create_datagram_endpoint_wrong_sock(self):
1078        sock = socket.socket(socket.AF_INET)
1079        with sock:
1080            coro = self.loop.create_datagram_endpoint(MyProto, sock=sock)
1081            with self.assertRaisesRegex(ValueError,
1082                                        'A UDP Socket was expected'):
1083                self.loop.run_until_complete(coro)
1084
1085    def test_create_connection_no_host_port_sock(self):
1086        coro = self.loop.create_connection(MyProto)
1087        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1088
1089    def test_create_connection_no_getaddrinfo(self):
1090        @asyncio.coroutine
1091        def getaddrinfo(*args, **kw):
1092            yield from []
1093
1094        def getaddrinfo_task(*args, **kwds):
1095            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1096
1097        self.loop.getaddrinfo = getaddrinfo_task
1098        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1099        self.assertRaises(
1100            OSError, self.loop.run_until_complete, coro)
1101
1102    def test_create_connection_connect_err(self):
1103        @asyncio.coroutine
1104        def getaddrinfo(*args, **kw):
1105            yield from []
1106            return [(2, 1, 6, '', ('107.6.106.82', 80))]
1107
1108        def getaddrinfo_task(*args, **kwds):
1109            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1110
1111        self.loop.getaddrinfo = getaddrinfo_task
1112        self.loop.sock_connect = mock.Mock()
1113        self.loop.sock_connect.side_effect = OSError
1114
1115        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1116        self.assertRaises(
1117            OSError, self.loop.run_until_complete, coro)
1118
1119    def test_create_connection_multiple(self):
1120        @asyncio.coroutine
1121        def getaddrinfo(*args, **kw):
1122            return [(2, 1, 6, '', ('0.0.0.1', 80)),
1123                    (2, 1, 6, '', ('0.0.0.2', 80))]
1124
1125        def getaddrinfo_task(*args, **kwds):
1126            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1127
1128        self.loop.getaddrinfo = getaddrinfo_task
1129        self.loop.sock_connect = mock.Mock()
1130        self.loop.sock_connect.side_effect = OSError
1131
1132        coro = self.loop.create_connection(
1133            MyProto, 'example.com', 80, family=socket.AF_INET)
1134        with self.assertRaises(OSError):
1135            self.loop.run_until_complete(coro)
1136
1137    @patch_socket
1138    def test_create_connection_multiple_errors_local_addr(self, m_socket):
1139
1140        def bind(addr):
1141            if addr[0] == '0.0.0.1':
1142                err = OSError('Err')
1143                err.strerror = 'Err'
1144                raise err
1145
1146        m_socket.socket.return_value.bind = bind
1147
1148        @asyncio.coroutine
1149        def getaddrinfo(*args, **kw):
1150            return [(2, 1, 6, '', ('0.0.0.1', 80)),
1151                    (2, 1, 6, '', ('0.0.0.2', 80))]
1152
1153        def getaddrinfo_task(*args, **kwds):
1154            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1155
1156        self.loop.getaddrinfo = getaddrinfo_task
1157        self.loop.sock_connect = mock.Mock()
1158        self.loop.sock_connect.side_effect = OSError('Err2')
1159
1160        coro = self.loop.create_connection(
1161            MyProto, 'example.com', 80, family=socket.AF_INET,
1162            local_addr=(None, 8080))
1163        with self.assertRaises(OSError) as cm:
1164            self.loop.run_until_complete(coro)
1165
1166        self.assertTrue(str(cm.exception).startswith('Multiple exceptions: '))
1167        self.assertTrue(m_socket.socket.return_value.close.called)
1168
1169    def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton):
1170        # Test the fallback code, even if this system has inet_pton.
1171        if not allow_inet_pton:
1172            del m_socket.inet_pton
1173
1174        m_socket.getaddrinfo = socket.getaddrinfo
1175        sock = m_socket.socket.return_value
1176
1177        self.loop._add_reader = mock.Mock()
1178        self.loop._add_reader._is_coroutine = False
1179        self.loop._add_writer = mock.Mock()
1180        self.loop._add_writer._is_coroutine = False
1181
1182        coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80)
1183        t, p = self.loop.run_until_complete(coro)
1184        try:
1185            sock.connect.assert_called_with(('1.2.3.4', 80))
1186            _, kwargs = m_socket.socket.call_args
1187            self.assertEqual(kwargs['family'], m_socket.AF_INET)
1188            self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1189        finally:
1190            t.close()
1191            test_utils.run_briefly(self.loop)  # allow transport to close
1192
1193        sock.family = socket.AF_INET6
1194        coro = self.loop.create_connection(asyncio.Protocol, '::1', 80)
1195        t, p = self.loop.run_until_complete(coro)
1196        try:
1197            # Without inet_pton we use getaddrinfo, which transforms ('::1', 80)
1198            # to ('::1', 80, 0, 0). The last 0s are flow info, scope id.
1199            [address] = sock.connect.call_args[0]
1200            host, port = address[:2]
1201            self.assertRegex(host, r'::(0\.)*1')
1202            self.assertEqual(port, 80)
1203            _, kwargs = m_socket.socket.call_args
1204            self.assertEqual(kwargs['family'], m_socket.AF_INET6)
1205            self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1206        finally:
1207            t.close()
1208            test_utils.run_briefly(self.loop)  # allow transport to close
1209
1210    @patch_socket
1211    def test_create_connection_ip_addr(self, m_socket):
1212        self._test_create_connection_ip_addr(m_socket, True)
1213
1214    @patch_socket
1215    def test_create_connection_no_inet_pton(self, m_socket):
1216        self._test_create_connection_ip_addr(m_socket, False)
1217
1218    @patch_socket
1219    def test_create_connection_service_name(self, m_socket):
1220        m_socket.getaddrinfo = socket.getaddrinfo
1221        sock = m_socket.socket.return_value
1222
1223        self.loop._add_reader = mock.Mock()
1224        self.loop._add_reader._is_coroutine = False
1225        self.loop._add_writer = mock.Mock()
1226        self.loop._add_writer._is_coroutine = False
1227
1228        for service, port in ('http', 80), (b'http', 80):
1229            coro = self.loop.create_connection(asyncio.Protocol,
1230                                               '127.0.0.1', service)
1231
1232            t, p = self.loop.run_until_complete(coro)
1233            try:
1234                sock.connect.assert_called_with(('127.0.0.1', port))
1235                _, kwargs = m_socket.socket.call_args
1236                self.assertEqual(kwargs['family'], m_socket.AF_INET)
1237                self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1238            finally:
1239                t.close()
1240                test_utils.run_briefly(self.loop)  # allow transport to close
1241
1242        for service in 'nonsense', b'nonsense':
1243            coro = self.loop.create_connection(asyncio.Protocol,
1244                                               '127.0.0.1', service)
1245
1246            with self.assertRaises(OSError):
1247                self.loop.run_until_complete(coro)
1248
1249    def test_create_connection_no_local_addr(self):
1250        @asyncio.coroutine
1251        def getaddrinfo(host, *args, **kw):
1252            if host == 'example.com':
1253                return [(2, 1, 6, '', ('107.6.106.82', 80)),
1254                        (2, 1, 6, '', ('107.6.106.82', 80))]
1255            else:
1256                return []
1257
1258        def getaddrinfo_task(*args, **kwds):
1259            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1260        self.loop.getaddrinfo = getaddrinfo_task
1261
1262        coro = self.loop.create_connection(
1263            MyProto, 'example.com', 80, family=socket.AF_INET,
1264            local_addr=(None, 8080))
1265        self.assertRaises(
1266            OSError, self.loop.run_until_complete, coro)
1267
1268    @patch_socket
1269    def test_create_connection_bluetooth(self, m_socket):
1270        # See http://bugs.python.org/issue27136, fallback to getaddrinfo when
1271        # we can't recognize an address is resolved, e.g. a Bluetooth address.
1272        addr = ('00:01:02:03:04:05', 1)
1273
1274        def getaddrinfo(host, port, *args, **kw):
1275            assert (host, port) == addr
1276            return [(999, 1, 999, '', (addr, 1))]
1277
1278        m_socket.getaddrinfo = getaddrinfo
1279        sock = m_socket.socket()
1280        coro = self.loop.sock_connect(sock, addr)
1281        self.loop.run_until_complete(coro)
1282
1283    def test_create_connection_ssl_server_hostname_default(self):
1284        self.loop.getaddrinfo = mock.Mock()
1285
1286        def mock_getaddrinfo(*args, **kwds):
1287            f = asyncio.Future(loop=self.loop)
1288            f.set_result([(socket.AF_INET, socket.SOCK_STREAM,
1289                           socket.SOL_TCP, '', ('1.2.3.4', 80))])
1290            return f
1291
1292        self.loop.getaddrinfo.side_effect = mock_getaddrinfo
1293        self.loop.sock_connect = mock.Mock()
1294        self.loop.sock_connect.return_value = ()
1295        self.loop._make_ssl_transport = mock.Mock()
1296
1297        class _SelectorTransportMock:
1298            _sock = None
1299
1300            def get_extra_info(self, key):
1301                return mock.Mock()
1302
1303            def close(self):
1304                self._sock.close()
1305
1306        def mock_make_ssl_transport(sock, protocol, sslcontext, waiter,
1307                                    **kwds):
1308            waiter.set_result(None)
1309            transport = _SelectorTransportMock()
1310            transport._sock = sock
1311            return transport
1312
1313        self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport
1314        ANY = mock.ANY
1315        # First try the default server_hostname.
1316        self.loop._make_ssl_transport.reset_mock()
1317        coro = self.loop.create_connection(MyProto, 'python.org', 80, ssl=True)
1318        transport, _ = self.loop.run_until_complete(coro)
1319        transport.close()
1320        self.loop._make_ssl_transport.assert_called_with(
1321            ANY, ANY, ANY, ANY,
1322            server_side=False,
1323            server_hostname='python.org')
1324        # Next try an explicit server_hostname.
1325        self.loop._make_ssl_transport.reset_mock()
1326        coro = self.loop.create_connection(MyProto, 'python.org', 80, ssl=True,
1327                                           server_hostname='perl.com')
1328        transport, _ = self.loop.run_until_complete(coro)
1329        transport.close()
1330        self.loop._make_ssl_transport.assert_called_with(
1331            ANY, ANY, ANY, ANY,
1332            server_side=False,
1333            server_hostname='perl.com')
1334        # Finally try an explicit empty server_hostname.
1335        self.loop._make_ssl_transport.reset_mock()
1336        coro = self.loop.create_connection(MyProto, 'python.org', 80, ssl=True,
1337                                           server_hostname='')
1338        transport, _ = self.loop.run_until_complete(coro)
1339        transport.close()
1340        self.loop._make_ssl_transport.assert_called_with(ANY, ANY, ANY, ANY,
1341                                                         server_side=False,
1342                                                         server_hostname='')
1343
1344    def test_create_connection_no_ssl_server_hostname_errors(self):
1345        # When not using ssl, server_hostname must be None.
1346        coro = self.loop.create_connection(MyProto, 'python.org', 80,
1347                                           server_hostname='')
1348        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1349        coro = self.loop.create_connection(MyProto, 'python.org', 80,
1350                                           server_hostname='python.org')
1351        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1352
1353    def test_create_connection_ssl_server_hostname_errors(self):
1354        # When using ssl, server_hostname may be None if host is non-empty.
1355        coro = self.loop.create_connection(MyProto, '', 80, ssl=True)
1356        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1357        coro = self.loop.create_connection(MyProto, None, 80, ssl=True)
1358        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1359        sock = socket.socket()
1360        coro = self.loop.create_connection(MyProto, None, None,
1361                                           ssl=True, sock=sock)
1362        self.addCleanup(sock.close)
1363        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1364
1365    def test_create_server_empty_host(self):
1366        # if host is empty string use None instead
1367        host = object()
1368
1369        @asyncio.coroutine
1370        def getaddrinfo(*args, **kw):
1371            nonlocal host
1372            host = args[0]
1373            yield from []
1374
1375        def getaddrinfo_task(*args, **kwds):
1376            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1377
1378        self.loop.getaddrinfo = getaddrinfo_task
1379        fut = self.loop.create_server(MyProto, '', 0)
1380        self.assertRaises(OSError, self.loop.run_until_complete, fut)
1381        self.assertIsNone(host)
1382
1383    def test_create_server_host_port_sock(self):
1384        fut = self.loop.create_server(
1385            MyProto, '0.0.0.0', 0, sock=object())
1386        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1387
1388    def test_create_server_no_host_port_sock(self):
1389        fut = self.loop.create_server(MyProto)
1390        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1391
1392    def test_create_server_no_getaddrinfo(self):
1393        getaddrinfo = self.loop.getaddrinfo = mock.Mock()
1394        getaddrinfo.return_value = []
1395
1396        f = self.loop.create_server(MyProto, 'python.org', 0)
1397        self.assertRaises(OSError, self.loop.run_until_complete, f)
1398
1399    @patch_socket
1400    def test_create_server_nosoreuseport(self, m_socket):
1401        m_socket.getaddrinfo = socket.getaddrinfo
1402        del m_socket.SO_REUSEPORT
1403        m_socket.socket.return_value = mock.Mock()
1404
1405        f = self.loop.create_server(
1406            MyProto, '0.0.0.0', 0, reuse_port=True)
1407
1408        self.assertRaises(ValueError, self.loop.run_until_complete, f)
1409
1410    @patch_socket
1411    def test_create_server_soreuseport_only_defined(self, m_socket):
1412        m_socket.getaddrinfo = socket.getaddrinfo
1413        m_socket.socket.return_value = mock.Mock()
1414        m_socket.SO_REUSEPORT = -1
1415
1416        f = self.loop.create_server(
1417            MyProto, '0.0.0.0', 0, reuse_port=True)
1418
1419        self.assertRaises(ValueError, self.loop.run_until_complete, f)
1420
1421    @patch_socket
1422    def test_create_server_cant_bind(self, m_socket):
1423
1424        class Err(OSError):
1425            strerror = 'error'
1426
1427        m_socket.getaddrinfo.return_value = [
1428            (2, 1, 6, '', ('127.0.0.1', 10100))]
1429        m_socket.getaddrinfo._is_coroutine = False
1430        m_sock = m_socket.socket.return_value = mock.Mock()
1431        m_sock.bind.side_effect = Err
1432
1433        fut = self.loop.create_server(MyProto, '0.0.0.0', 0)
1434        self.assertRaises(OSError, self.loop.run_until_complete, fut)
1435        self.assertTrue(m_sock.close.called)
1436
1437    @patch_socket
1438    def test_create_datagram_endpoint_no_addrinfo(self, m_socket):
1439        m_socket.getaddrinfo.return_value = []
1440        m_socket.getaddrinfo._is_coroutine = False
1441
1442        coro = self.loop.create_datagram_endpoint(
1443            MyDatagramProto, local_addr=('localhost', 0))
1444        self.assertRaises(
1445            OSError, self.loop.run_until_complete, coro)
1446
1447    def test_create_datagram_endpoint_addr_error(self):
1448        coro = self.loop.create_datagram_endpoint(
1449            MyDatagramProto, local_addr='localhost')
1450        self.assertRaises(
1451            AssertionError, self.loop.run_until_complete, coro)
1452        coro = self.loop.create_datagram_endpoint(
1453            MyDatagramProto, local_addr=('localhost', 1, 2, 3))
1454        self.assertRaises(
1455            AssertionError, self.loop.run_until_complete, coro)
1456
1457    def test_create_datagram_endpoint_connect_err(self):
1458        self.loop.sock_connect = mock.Mock()
1459        self.loop.sock_connect.side_effect = OSError
1460
1461        coro = self.loop.create_datagram_endpoint(
1462            asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0))
1463        self.assertRaises(
1464            OSError, self.loop.run_until_complete, coro)
1465
1466    @patch_socket
1467    def test_create_datagram_endpoint_socket_err(self, m_socket):
1468        m_socket.getaddrinfo = socket.getaddrinfo
1469        m_socket.socket.side_effect = OSError
1470
1471        coro = self.loop.create_datagram_endpoint(
1472            asyncio.DatagramProtocol, family=socket.AF_INET)
1473        self.assertRaises(
1474            OSError, self.loop.run_until_complete, coro)
1475
1476        coro = self.loop.create_datagram_endpoint(
1477            asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0))
1478        self.assertRaises(
1479            OSError, self.loop.run_until_complete, coro)
1480
1481    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
1482    def test_create_datagram_endpoint_no_matching_family(self):
1483        coro = self.loop.create_datagram_endpoint(
1484            asyncio.DatagramProtocol,
1485            remote_addr=('127.0.0.1', 0), local_addr=('::1', 0))
1486        self.assertRaises(
1487            ValueError, self.loop.run_until_complete, coro)
1488
1489    @patch_socket
1490    def test_create_datagram_endpoint_setblk_err(self, m_socket):
1491        m_socket.socket.return_value.setblocking.side_effect = OSError
1492
1493        coro = self.loop.create_datagram_endpoint(
1494            asyncio.DatagramProtocol, family=socket.AF_INET)
1495        self.assertRaises(
1496            OSError, self.loop.run_until_complete, coro)
1497        self.assertTrue(
1498            m_socket.socket.return_value.close.called)
1499
1500    def test_create_datagram_endpoint_noaddr_nofamily(self):
1501        coro = self.loop.create_datagram_endpoint(
1502            asyncio.DatagramProtocol)
1503        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1504
1505    @patch_socket
1506    def test_create_datagram_endpoint_cant_bind(self, m_socket):
1507        class Err(OSError):
1508            pass
1509
1510        m_socket.getaddrinfo = socket.getaddrinfo
1511        m_sock = m_socket.socket.return_value = mock.Mock()
1512        m_sock.bind.side_effect = Err
1513
1514        fut = self.loop.create_datagram_endpoint(
1515            MyDatagramProto,
1516            local_addr=('127.0.0.1', 0), family=socket.AF_INET)
1517        self.assertRaises(Err, self.loop.run_until_complete, fut)
1518        self.assertTrue(m_sock.close.called)
1519
1520    def test_create_datagram_endpoint_sock(self):
1521        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1522        sock.bind(('127.0.0.1', 0))
1523        fut = self.loop.create_datagram_endpoint(
1524            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1525            sock=sock)
1526        transport, protocol = self.loop.run_until_complete(fut)
1527        transport.close()
1528        self.loop.run_until_complete(protocol.done)
1529        self.assertEqual('CLOSED', protocol.state)
1530
1531    def test_create_datagram_endpoint_sock_sockopts(self):
1532        class FakeSock:
1533            type = socket.SOCK_DGRAM
1534
1535        fut = self.loop.create_datagram_endpoint(
1536            MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock())
1537        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1538
1539        fut = self.loop.create_datagram_endpoint(
1540            MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock())
1541        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1542
1543        fut = self.loop.create_datagram_endpoint(
1544            MyDatagramProto, family=1, sock=FakeSock())
1545        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1546
1547        fut = self.loop.create_datagram_endpoint(
1548            MyDatagramProto, proto=1, sock=FakeSock())
1549        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1550
1551        fut = self.loop.create_datagram_endpoint(
1552            MyDatagramProto, flags=1, sock=FakeSock())
1553        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1554
1555        fut = self.loop.create_datagram_endpoint(
1556            MyDatagramProto, reuse_address=True, sock=FakeSock())
1557        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1558
1559        fut = self.loop.create_datagram_endpoint(
1560            MyDatagramProto, reuse_port=True, sock=FakeSock())
1561        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1562
1563        fut = self.loop.create_datagram_endpoint(
1564            MyDatagramProto, allow_broadcast=True, sock=FakeSock())
1565        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1566
1567    def test_create_datagram_endpoint_sockopts(self):
1568        # Socket options should not be applied unless asked for.
1569        # SO_REUSEADDR defaults to on for UNIX.
1570        # SO_REUSEPORT is not available on all platforms.
1571
1572        coro = self.loop.create_datagram_endpoint(
1573            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1574            local_addr=('127.0.0.1', 0))
1575        transport, protocol = self.loop.run_until_complete(coro)
1576        sock = transport.get_extra_info('socket')
1577
1578        reuse_address_default_on = (
1579            os.name == 'posix' and sys.platform != 'cygwin')
1580        reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
1581
1582        if reuse_address_default_on:
1583            self.assertTrue(
1584                sock.getsockopt(
1585                    socket.SOL_SOCKET, socket.SO_REUSEADDR))
1586        else:
1587            self.assertFalse(
1588                sock.getsockopt(
1589                    socket.SOL_SOCKET, socket.SO_REUSEADDR))
1590        if reuseport_supported:
1591            self.assertFalse(
1592                sock.getsockopt(
1593                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
1594        self.assertFalse(
1595            sock.getsockopt(
1596                socket.SOL_SOCKET, socket.SO_BROADCAST))
1597
1598        transport.close()
1599        self.loop.run_until_complete(protocol.done)
1600        self.assertEqual('CLOSED', protocol.state)
1601
1602        coro = self.loop.create_datagram_endpoint(
1603            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1604            local_addr=('127.0.0.1', 0),
1605            reuse_address=True,
1606            reuse_port=reuseport_supported,
1607            allow_broadcast=True)
1608        transport, protocol = self.loop.run_until_complete(coro)
1609        sock = transport.get_extra_info('socket')
1610
1611        self.assertTrue(
1612            sock.getsockopt(
1613                socket.SOL_SOCKET, socket.SO_REUSEADDR))
1614        if reuseport_supported:
1615            self.assertTrue(
1616                sock.getsockopt(
1617                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
1618        self.assertTrue(
1619            sock.getsockopt(
1620                socket.SOL_SOCKET, socket.SO_BROADCAST))
1621
1622        transport.close()
1623        self.loop.run_until_complete(protocol.done)
1624        self.assertEqual('CLOSED', protocol.state)
1625
1626    @patch_socket
1627    def test_create_datagram_endpoint_nosoreuseport(self, m_socket):
1628        del m_socket.SO_REUSEPORT
1629        m_socket.socket.return_value = mock.Mock()
1630
1631        coro = self.loop.create_datagram_endpoint(
1632            lambda: MyDatagramProto(loop=self.loop),
1633            local_addr=('127.0.0.1', 0),
1634            reuse_address=False,
1635            reuse_port=True)
1636
1637        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1638
1639    @patch_socket
1640    def test_create_datagram_endpoint_ip_addr(self, m_socket):
1641        def getaddrinfo(*args, **kw):
1642            self.fail('should not have called getaddrinfo')
1643
1644        m_socket.getaddrinfo = getaddrinfo
1645        m_socket.socket.return_value.bind = bind = mock.Mock()
1646        self.loop._add_reader = mock.Mock()
1647        self.loop._add_reader._is_coroutine = False
1648
1649        reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
1650        coro = self.loop.create_datagram_endpoint(
1651            lambda: MyDatagramProto(loop=self.loop),
1652            local_addr=('1.2.3.4', 0),
1653            reuse_address=False,
1654            reuse_port=reuseport_supported)
1655
1656        t, p = self.loop.run_until_complete(coro)
1657        try:
1658            bind.assert_called_with(('1.2.3.4', 0))
1659            m_socket.socket.assert_called_with(family=m_socket.AF_INET,
1660                                               proto=m_socket.IPPROTO_UDP,
1661                                               type=m_socket.SOCK_DGRAM)
1662        finally:
1663            t.close()
1664            test_utils.run_briefly(self.loop)  # allow transport to close
1665
1666    def test_accept_connection_retry(self):
1667        sock = mock.Mock()
1668        sock.accept.side_effect = BlockingIOError()
1669
1670        self.loop._accept_connection(MyProto, sock)
1671        self.assertFalse(sock.close.called)
1672
1673    @mock.patch('asyncio.base_events.logger')
1674    def test_accept_connection_exception(self, m_log):
1675        sock = mock.Mock()
1676        sock.fileno.return_value = 10
1677        sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
1678        self.loop._remove_reader = mock.Mock()
1679        self.loop.call_later = mock.Mock()
1680
1681        self.loop._accept_connection(MyProto, sock)
1682        self.assertTrue(m_log.error.called)
1683        self.assertFalse(sock.close.called)
1684        self.loop._remove_reader.assert_called_with(10)
1685        self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY,
1686                                                # self.loop._start_serving
1687                                                mock.ANY,
1688                                                MyProto, sock, None, None, mock.ANY)
1689
1690    def test_call_coroutine(self):
1691        @asyncio.coroutine
1692        def simple_coroutine():
1693            pass
1694
1695        self.loop.set_debug(True)
1696        coro_func = simple_coroutine
1697        coro_obj = coro_func()
1698        self.addCleanup(coro_obj.close)
1699        for func in (coro_func, coro_obj):
1700            with self.assertRaises(TypeError):
1701                self.loop.call_soon(func)
1702            with self.assertRaises(TypeError):
1703                self.loop.call_soon_threadsafe(func)
1704            with self.assertRaises(TypeError):
1705                self.loop.call_later(60, func)
1706            with self.assertRaises(TypeError):
1707                self.loop.call_at(self.loop.time() + 60, func)
1708            with self.assertRaises(TypeError):
1709                self.loop.run_in_executor(None, func)
1710
1711    @mock.patch('asyncio.base_events.logger')
1712    def test_log_slow_callbacks(self, m_logger):
1713        def stop_loop_cb(loop):
1714            loop.stop()
1715
1716        @asyncio.coroutine
1717        def stop_loop_coro(loop):
1718            yield from ()
1719            loop.stop()
1720
1721        asyncio.set_event_loop(self.loop)
1722        self.loop.set_debug(True)
1723        self.loop.slow_callback_duration = 0.0
1724
1725        # slow callback
1726        self.loop.call_soon(stop_loop_cb, self.loop)
1727        self.loop.run_forever()
1728        fmt, *args = m_logger.warning.call_args[0]
1729        self.assertRegex(fmt % tuple(args),
1730                         "^Executing <Handle.*stop_loop_cb.*> "
1731                         "took .* seconds$")
1732
1733        # slow task
1734        asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop)
1735        self.loop.run_forever()
1736        fmt, *args = m_logger.warning.call_args[0]
1737        self.assertRegex(fmt % tuple(args),
1738                         "^Executing <Task.*stop_loop_coro.*> "
1739                         "took .* seconds$")
1740
1741
1742class RunningLoopTests(unittest.TestCase):
1743
1744    def test_running_loop_within_a_loop(self):
1745        @asyncio.coroutine
1746        def runner(loop):
1747            loop.run_forever()
1748
1749        loop = asyncio.new_event_loop()
1750        outer_loop = asyncio.new_event_loop()
1751        try:
1752            with self.assertRaisesRegex(RuntimeError,
1753                                        'while another loop is running'):
1754                outer_loop.run_until_complete(runner(loop))
1755        finally:
1756            loop.close()
1757            outer_loop.close()
1758
1759
1760if __name__ == '__main__':
1761    unittest.main()
1762