• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for base_events.py"""
2
3import errno
4import logging
5import math
6import os
7import socket
8import sys
9import threading
10import time
11import unittest
12from unittest import mock
13
14import asyncio
15from asyncio import base_events
16from asyncio import constants
17from asyncio import events
18from test.test_asyncio import utils as test_utils
19from test import support
20from test.support.script_helper import assert_python_ok
21
22
23MOCK_ANY = mock.ANY
24PY34 = sys.version_info >= (3, 4)
25
26
27def mock_socket_module():
28    m_socket = mock.MagicMock(spec=socket)
29    for name in (
30        'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP',
31        'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton'
32    ):
33        if hasattr(socket, name):
34            setattr(m_socket, name, getattr(socket, name))
35        else:
36            delattr(m_socket, name)
37
38    m_socket.socket = mock.MagicMock()
39    m_socket.socket.return_value = test_utils.mock_nonblocking_socket()
40    m_socket.getaddrinfo._is_coroutine = False
41
42    return m_socket
43
44
45def patch_socket(f):
46    return mock.patch('asyncio.base_events.socket',
47                      new_callable=mock_socket_module)(f)
48
49
50class BaseEventTests(test_utils.TestCase):
51
52    def test_ipaddr_info(self):
53        UNSPEC = socket.AF_UNSPEC
54        INET = socket.AF_INET
55        INET6 = socket.AF_INET6
56        STREAM = socket.SOCK_STREAM
57        DGRAM = socket.SOCK_DGRAM
58        TCP = socket.IPPROTO_TCP
59        UDP = socket.IPPROTO_UDP
60
61        self.assertEqual(
62            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
63            base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP))
64
65        self.assertEqual(
66            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
67            base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP))
68
69        self.assertEqual(
70            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
71            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP))
72
73        self.assertEqual(
74            (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
75            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP))
76
77        # Socket type STREAM implies TCP protocol.
78        self.assertEqual(
79            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
80            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0))
81
82        # Socket type DGRAM implies UDP protocol.
83        self.assertEqual(
84            (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
85            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0))
86
87        # No socket type.
88        self.assertIsNone(
89            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0))
90
91        # IPv4 address with family IPv6.
92        self.assertIsNone(
93            base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP))
94
95        self.assertEqual(
96            (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)),
97            base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP))
98
99        self.assertEqual(
100            (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)),
101            base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP))
102
103        # IPv6 address with family IPv4.
104        self.assertIsNone(
105            base_events._ipaddr_info('::3', 1, INET, STREAM, TCP))
106
107        # IPv6 address with zone index.
108        self.assertIsNone(
109            base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP))
110
111    def test_port_parameter_types(self):
112        # Test obscure kinds of arguments for "port".
113        INET = socket.AF_INET
114        STREAM = socket.SOCK_STREAM
115        TCP = socket.IPPROTO_TCP
116
117        self.assertEqual(
118            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
119            base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP))
120
121        self.assertEqual(
122            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
123            base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP))
124
125        self.assertEqual(
126            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
127            base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP))
128
129        self.assertEqual(
130            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
131            base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP))
132
133        self.assertEqual(
134            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
135            base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP))
136
137    @patch_socket
138    def test_ipaddr_info_no_inet_pton(self, m_socket):
139        del m_socket.inet_pton
140        self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1,
141                                                   socket.AF_INET,
142                                                   socket.SOCK_STREAM,
143                                                   socket.IPPROTO_TCP))
144
145
146class BaseEventLoopTests(test_utils.TestCase):
147
148    def setUp(self):
149        super().setUp()
150        self.loop = base_events.BaseEventLoop()
151        self.loop._selector = mock.Mock()
152        self.loop._selector.select.return_value = ()
153        self.set_event_loop(self.loop)
154
155    def test_not_implemented(self):
156        m = mock.Mock()
157        self.assertRaises(
158            NotImplementedError,
159            self.loop._make_socket_transport, m, m)
160        self.assertRaises(
161            NotImplementedError,
162            self.loop._make_ssl_transport, m, m, m, m)
163        self.assertRaises(
164            NotImplementedError,
165            self.loop._make_datagram_transport, m, m)
166        self.assertRaises(
167            NotImplementedError, self.loop._process_events, [])
168        self.assertRaises(
169            NotImplementedError, self.loop._write_to_self)
170        self.assertRaises(
171            NotImplementedError,
172            self.loop._make_read_pipe_transport, m, m)
173        self.assertRaises(
174            NotImplementedError,
175            self.loop._make_write_pipe_transport, m, m)
176        gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m)
177        with self.assertRaises(NotImplementedError):
178            gen.send(None)
179
180    def test_close(self):
181        self.assertFalse(self.loop.is_closed())
182        self.loop.close()
183        self.assertTrue(self.loop.is_closed())
184
185        # it should be possible to call close() more than once
186        self.loop.close()
187        self.loop.close()
188
189        # operation blocked when the loop is closed
190        f = asyncio.Future(loop=self.loop)
191        self.assertRaises(RuntimeError, self.loop.run_forever)
192        self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
193
194    def test__add_callback_handle(self):
195        h = asyncio.Handle(lambda: False, (), self.loop, None)
196
197        self.loop._add_callback(h)
198        self.assertFalse(self.loop._scheduled)
199        self.assertIn(h, self.loop._ready)
200
201    def test__add_callback_cancelled_handle(self):
202        h = asyncio.Handle(lambda: False, (), self.loop, None)
203        h.cancel()
204
205        self.loop._add_callback(h)
206        self.assertFalse(self.loop._scheduled)
207        self.assertFalse(self.loop._ready)
208
209    def test_set_default_executor(self):
210        executor = mock.Mock()
211        self.loop.set_default_executor(executor)
212        self.assertIs(executor, self.loop._default_executor)
213
214    def test_call_soon(self):
215        def cb():
216            pass
217
218        h = self.loop.call_soon(cb)
219        self.assertEqual(h._callback, cb)
220        self.assertIsInstance(h, asyncio.Handle)
221        self.assertIn(h, self.loop._ready)
222
223    def test_call_soon_non_callable(self):
224        self.loop.set_debug(True)
225        with self.assertRaisesRegex(TypeError, 'a callable object'):
226            self.loop.call_soon(1)
227
228    def test_call_later(self):
229        def cb():
230            pass
231
232        h = self.loop.call_later(10.0, cb)
233        self.assertIsInstance(h, asyncio.TimerHandle)
234        self.assertIn(h, self.loop._scheduled)
235        self.assertNotIn(h, self.loop._ready)
236
237    def test_call_later_negative_delays(self):
238        calls = []
239
240        def cb(arg):
241            calls.append(arg)
242
243        self.loop._process_events = mock.Mock()
244        self.loop.call_later(-1, cb, 'a')
245        self.loop.call_later(-2, cb, 'b')
246        test_utils.run_briefly(self.loop)
247        self.assertEqual(calls, ['b', 'a'])
248
249    def test_time_and_call_at(self):
250        def cb():
251            self.loop.stop()
252
253        self.loop._process_events = mock.Mock()
254        delay = 0.1
255
256        when = self.loop.time() + delay
257        self.loop.call_at(when, cb)
258        t0 = self.loop.time()
259        self.loop.run_forever()
260        dt = self.loop.time() - t0
261
262        # 50 ms: maximum granularity of the event loop
263        self.assertGreaterEqual(dt, delay - 0.050, dt)
264        # tolerate a difference of +800 ms because some Python buildbots
265        # are really slow
266        self.assertLessEqual(dt, 0.9, dt)
267
268    def check_thread(self, loop, debug):
269        def cb():
270            pass
271
272        loop.set_debug(debug)
273        if debug:
274            msg = ("Non-thread-safe operation invoked on an event loop other "
275                   "than the current one")
276            with self.assertRaisesRegex(RuntimeError, msg):
277                loop.call_soon(cb)
278            with self.assertRaisesRegex(RuntimeError, msg):
279                loop.call_later(60, cb)
280            with self.assertRaisesRegex(RuntimeError, msg):
281                loop.call_at(loop.time() + 60, cb)
282        else:
283            loop.call_soon(cb)
284            loop.call_later(60, cb)
285            loop.call_at(loop.time() + 60, cb)
286
287    def test_check_thread(self):
288        def check_in_thread(loop, event, debug, create_loop, fut):
289            # wait until the event loop is running
290            event.wait()
291
292            try:
293                if create_loop:
294                    loop2 = base_events.BaseEventLoop()
295                    try:
296                        asyncio.set_event_loop(loop2)
297                        self.check_thread(loop, debug)
298                    finally:
299                        asyncio.set_event_loop(None)
300                        loop2.close()
301                else:
302                    self.check_thread(loop, debug)
303            except Exception as exc:
304                loop.call_soon_threadsafe(fut.set_exception, exc)
305            else:
306                loop.call_soon_threadsafe(fut.set_result, None)
307
308        def test_thread(loop, debug, create_loop=False):
309            event = threading.Event()
310            fut = asyncio.Future(loop=loop)
311            loop.call_soon(event.set)
312            args = (loop, event, debug, create_loop, fut)
313            thread = threading.Thread(target=check_in_thread, args=args)
314            thread.start()
315            loop.run_until_complete(fut)
316            thread.join()
317
318        self.loop._process_events = mock.Mock()
319        self.loop._write_to_self = mock.Mock()
320
321        # raise RuntimeError if the thread has no event loop
322        test_thread(self.loop, True)
323
324        # check disabled if debug mode is disabled
325        test_thread(self.loop, False)
326
327        # raise RuntimeError if the event loop of the thread is not the called
328        # event loop
329        test_thread(self.loop, True, create_loop=True)
330
331        # check disabled if debug mode is disabled
332        test_thread(self.loop, False, create_loop=True)
333
334    def test__run_once(self):
335        h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (),
336                                 self.loop, None)
337        h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (),
338                                 self.loop, None)
339
340        h1.cancel()
341
342        self.loop._process_events = mock.Mock()
343        self.loop._scheduled.append(h1)
344        self.loop._scheduled.append(h2)
345        self.loop._run_once()
346
347        t = self.loop._selector.select.call_args[0][0]
348        self.assertTrue(9.5 < t < 10.5, t)
349        self.assertEqual([h2], self.loop._scheduled)
350        self.assertTrue(self.loop._process_events.called)
351
352    def test_set_debug(self):
353        self.loop.set_debug(True)
354        self.assertTrue(self.loop.get_debug())
355        self.loop.set_debug(False)
356        self.assertFalse(self.loop.get_debug())
357
358    @mock.patch('asyncio.base_events.logger')
359    def test__run_once_logging(self, m_logger):
360        def slow_select(timeout):
361            # Sleep a bit longer than a second to avoid timer resolution
362            # issues.
363            time.sleep(1.1)
364            return []
365
366        # logging needs debug flag
367        self.loop.set_debug(True)
368
369        # Log to INFO level if timeout > 1.0 sec.
370        self.loop._selector.select = slow_select
371        self.loop._process_events = mock.Mock()
372        self.loop._run_once()
373        self.assertEqual(logging.INFO, m_logger.log.call_args[0][0])
374
375        def fast_select(timeout):
376            time.sleep(0.001)
377            return []
378
379        self.loop._selector.select = fast_select
380        self.loop._run_once()
381        self.assertEqual(logging.DEBUG, m_logger.log.call_args[0][0])
382
383    def test__run_once_schedule_handle(self):
384        handle = None
385        processed = False
386
387        def cb(loop):
388            nonlocal processed, handle
389            processed = True
390            handle = loop.call_soon(lambda: True)
391
392        h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
393                                self.loop, None)
394
395        self.loop._process_events = mock.Mock()
396        self.loop._scheduled.append(h)
397        self.loop._run_once()
398
399        self.assertTrue(processed)
400        self.assertEqual([handle], list(self.loop._ready))
401
402    def test__run_once_cancelled_event_cleanup(self):
403        self.loop._process_events = mock.Mock()
404
405        self.assertTrue(
406            0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0)
407
408        def cb():
409            pass
410
411        # Set up one "blocking" event that will not be cancelled to
412        # ensure later cancelled events do not make it to the head
413        # of the queue and get cleaned.
414        not_cancelled_count = 1
415        self.loop.call_later(3000, cb)
416
417        # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES)
418        # cancelled handles, ensure they aren't removed
419
420        cancelled_count = 2
421        for x in range(2):
422            h = self.loop.call_later(3600, cb)
423            h.cancel()
424
425        # Add some cancelled events that will be at head and removed
426        cancelled_count += 2
427        for x in range(2):
428            h = self.loop.call_later(100, cb)
429            h.cancel()
430
431        # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low
432        self.assertLessEqual(cancelled_count + not_cancelled_count,
433            base_events._MIN_SCHEDULED_TIMER_HANDLES)
434
435        self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
436
437        self.loop._run_once()
438
439        cancelled_count -= 2
440
441        self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
442
443        self.assertEqual(len(self.loop._scheduled),
444            cancelled_count + not_cancelled_count)
445
446        # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION
447        # so that deletion of cancelled events will occur on next _run_once
448        add_cancel_count = int(math.ceil(
449            base_events._MIN_SCHEDULED_TIMER_HANDLES *
450            base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1
451
452        add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES -
453            add_cancel_count, 0)
454
455        # Add some events that will not be cancelled
456        not_cancelled_count += add_not_cancel_count
457        for x in range(add_not_cancel_count):
458            self.loop.call_later(3600, cb)
459
460        # Add enough cancelled events
461        cancelled_count += add_cancel_count
462        for x in range(add_cancel_count):
463            h = self.loop.call_later(3600, cb)
464            h.cancel()
465
466        # Ensure all handles are still scheduled
467        self.assertEqual(len(self.loop._scheduled),
468            cancelled_count + not_cancelled_count)
469
470        self.loop._run_once()
471
472        # Ensure cancelled events were removed
473        self.assertEqual(len(self.loop._scheduled), not_cancelled_count)
474
475        # Ensure only uncancelled events remain scheduled
476        self.assertTrue(all([not x._cancelled for x in self.loop._scheduled]))
477
478    def test_run_until_complete_type_error(self):
479        self.assertRaises(TypeError,
480            self.loop.run_until_complete, 'blah')
481
482    def test_run_until_complete_loop(self):
483        task = asyncio.Future(loop=self.loop)
484        other_loop = self.new_test_loop()
485        self.addCleanup(other_loop.close)
486        self.assertRaises(ValueError,
487            other_loop.run_until_complete, task)
488
489    def test_run_until_complete_loop_orphan_future_close_loop(self):
490        class ShowStopper(BaseException):
491            pass
492
493        async def foo(delay):
494            await asyncio.sleep(delay, loop=self.loop)
495
496        def throw():
497            raise ShowStopper
498
499        self.loop._process_events = mock.Mock()
500        self.loop.call_soon(throw)
501        try:
502            self.loop.run_until_complete(foo(0.1))
503        except ShowStopper:
504            pass
505
506        # This call fails if run_until_complete does not clean up
507        # done-callback for the previous future.
508        self.loop.run_until_complete(foo(0.2))
509
510    def test_subprocess_exec_invalid_args(self):
511        args = [sys.executable, '-c', 'pass']
512
513        # missing program parameter (empty args)
514        self.assertRaises(TypeError,
515            self.loop.run_until_complete, self.loop.subprocess_exec,
516            asyncio.SubprocessProtocol)
517
518        # expected multiple arguments, not a list
519        self.assertRaises(TypeError,
520            self.loop.run_until_complete, self.loop.subprocess_exec,
521            asyncio.SubprocessProtocol, args)
522
523        # program arguments must be strings, not int
524        self.assertRaises(TypeError,
525            self.loop.run_until_complete, self.loop.subprocess_exec,
526            asyncio.SubprocessProtocol, sys.executable, 123)
527
528        # universal_newlines, shell, bufsize must not be set
529        self.assertRaises(TypeError,
530        self.loop.run_until_complete, self.loop.subprocess_exec,
531            asyncio.SubprocessProtocol, *args, universal_newlines=True)
532        self.assertRaises(TypeError,
533            self.loop.run_until_complete, self.loop.subprocess_exec,
534            asyncio.SubprocessProtocol, *args, shell=True)
535        self.assertRaises(TypeError,
536            self.loop.run_until_complete, self.loop.subprocess_exec,
537            asyncio.SubprocessProtocol, *args, bufsize=4096)
538
539    def test_subprocess_shell_invalid_args(self):
540        # expected a string, not an int or a list
541        self.assertRaises(TypeError,
542            self.loop.run_until_complete, self.loop.subprocess_shell,
543            asyncio.SubprocessProtocol, 123)
544        self.assertRaises(TypeError,
545            self.loop.run_until_complete, self.loop.subprocess_shell,
546            asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass'])
547
548        # universal_newlines, shell, bufsize must not be set
549        self.assertRaises(TypeError,
550            self.loop.run_until_complete, self.loop.subprocess_shell,
551            asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True)
552        self.assertRaises(TypeError,
553            self.loop.run_until_complete, self.loop.subprocess_shell,
554            asyncio.SubprocessProtocol, 'exit 0', shell=True)
555        self.assertRaises(TypeError,
556            self.loop.run_until_complete, self.loop.subprocess_shell,
557            asyncio.SubprocessProtocol, 'exit 0', bufsize=4096)
558
559    def test_default_exc_handler_callback(self):
560        self.loop._process_events = mock.Mock()
561
562        def zero_error(fut):
563            fut.set_result(True)
564            1/0
565
566        # Test call_soon (events.Handle)
567        with mock.patch('asyncio.base_events.logger') as log:
568            fut = asyncio.Future(loop=self.loop)
569            self.loop.call_soon(zero_error, fut)
570            fut.add_done_callback(lambda fut: self.loop.stop())
571            self.loop.run_forever()
572            log.error.assert_called_with(
573                test_utils.MockPattern('Exception in callback.*zero'),
574                exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
575
576        # Test call_later (events.TimerHandle)
577        with mock.patch('asyncio.base_events.logger') as log:
578            fut = asyncio.Future(loop=self.loop)
579            self.loop.call_later(0.01, zero_error, fut)
580            fut.add_done_callback(lambda fut: self.loop.stop())
581            self.loop.run_forever()
582            log.error.assert_called_with(
583                test_utils.MockPattern('Exception in callback.*zero'),
584                exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
585
586    def test_default_exc_handler_coro(self):
587        self.loop._process_events = mock.Mock()
588
589        @asyncio.coroutine
590        def zero_error_coro():
591            yield from asyncio.sleep(0.01, loop=self.loop)
592            1/0
593
594        # Test Future.__del__
595        with mock.patch('asyncio.base_events.logger') as log:
596            fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop)
597            fut.add_done_callback(lambda *args: self.loop.stop())
598            self.loop.run_forever()
599            fut = None # Trigger Future.__del__ or futures._TracebackLogger
600            support.gc_collect()
601            if PY34:
602                # Future.__del__ in Python 3.4 logs error with
603                # an actual exception context
604                log.error.assert_called_with(
605                    test_utils.MockPattern('.*exception was never retrieved'),
606                    exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
607            else:
608                # futures._TracebackLogger logs only textual traceback
609                log.error.assert_called_with(
610                    test_utils.MockPattern(
611                        '.*exception was never retrieved.*ZeroDiv'),
612                    exc_info=False)
613
614    def test_set_exc_handler_invalid(self):
615        with self.assertRaisesRegex(TypeError, 'A callable object or None'):
616            self.loop.set_exception_handler('spam')
617
618    def test_set_exc_handler_custom(self):
619        def zero_error():
620            1/0
621
622        def run_loop():
623            handle = self.loop.call_soon(zero_error)
624            self.loop._run_once()
625            return handle
626
627        self.loop.set_debug(True)
628        self.loop._process_events = mock.Mock()
629
630        self.assertIsNone(self.loop.get_exception_handler())
631        mock_handler = mock.Mock()
632        self.loop.set_exception_handler(mock_handler)
633        self.assertIs(self.loop.get_exception_handler(), mock_handler)
634        handle = run_loop()
635        mock_handler.assert_called_with(self.loop, {
636            'exception': MOCK_ANY,
637            'message': test_utils.MockPattern(
638                                'Exception in callback.*zero_error'),
639            'handle': handle,
640            'source_traceback': handle._source_traceback,
641        })
642        mock_handler.reset_mock()
643
644        self.loop.set_exception_handler(None)
645        with mock.patch('asyncio.base_events.logger') as log:
646            run_loop()
647            log.error.assert_called_with(
648                        test_utils.MockPattern(
649                                'Exception in callback.*zero'),
650                        exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
651
652        assert not mock_handler.called
653
654    def test_set_exc_handler_broken(self):
655        def run_loop():
656            def zero_error():
657                1/0
658            self.loop.call_soon(zero_error)
659            self.loop._run_once()
660
661        def handler(loop, context):
662            raise AttributeError('spam')
663
664        self.loop._process_events = mock.Mock()
665
666        self.loop.set_exception_handler(handler)
667
668        with mock.patch('asyncio.base_events.logger') as log:
669            run_loop()
670            log.error.assert_called_with(
671                test_utils.MockPattern(
672                    'Unhandled error in exception handler'),
673                exc_info=(AttributeError, MOCK_ANY, MOCK_ANY))
674
675    def test_default_exc_handler_broken(self):
676        _context = None
677
678        class Loop(base_events.BaseEventLoop):
679
680            _selector = mock.Mock()
681            _process_events = mock.Mock()
682
683            def default_exception_handler(self, context):
684                nonlocal _context
685                _context = context
686                # Simulates custom buggy "default_exception_handler"
687                raise ValueError('spam')
688
689        loop = Loop()
690        self.addCleanup(loop.close)
691        asyncio.set_event_loop(loop)
692
693        def run_loop():
694            def zero_error():
695                1/0
696            loop.call_soon(zero_error)
697            loop._run_once()
698
699        with mock.patch('asyncio.base_events.logger') as log:
700            run_loop()
701            log.error.assert_called_with(
702                'Exception in default exception handler',
703                exc_info=True)
704
705        def custom_handler(loop, context):
706            raise ValueError('ham')
707
708        _context = None
709        loop.set_exception_handler(custom_handler)
710        with mock.patch('asyncio.base_events.logger') as log:
711            run_loop()
712            log.error.assert_called_with(
713                test_utils.MockPattern('Exception in default exception.*'
714                                       'while handling.*in custom'),
715                exc_info=True)
716
717            # Check that original context was passed to default
718            # exception handler.
719            self.assertIn('context', _context)
720            self.assertIs(type(_context['context']['exception']),
721                          ZeroDivisionError)
722
723    def test_set_task_factory_invalid(self):
724        with self.assertRaisesRegex(
725            TypeError, 'task factory must be a callable or None'):
726
727            self.loop.set_task_factory(1)
728
729        self.assertIsNone(self.loop.get_task_factory())
730
731    def test_set_task_factory(self):
732        self.loop._process_events = mock.Mock()
733
734        class MyTask(asyncio.Task):
735            pass
736
737        @asyncio.coroutine
738        def coro():
739            pass
740
741        factory = lambda loop, coro: MyTask(coro, loop=loop)
742
743        self.assertIsNone(self.loop.get_task_factory())
744        self.loop.set_task_factory(factory)
745        self.assertIs(self.loop.get_task_factory(), factory)
746
747        task = self.loop.create_task(coro())
748        self.assertTrue(isinstance(task, MyTask))
749        self.loop.run_until_complete(task)
750
751        self.loop.set_task_factory(None)
752        self.assertIsNone(self.loop.get_task_factory())
753
754        task = self.loop.create_task(coro())
755        self.assertTrue(isinstance(task, asyncio.Task))
756        self.assertFalse(isinstance(task, MyTask))
757        self.loop.run_until_complete(task)
758
759    def test_env_var_debug(self):
760        code = '\n'.join((
761            'import asyncio',
762            'loop = asyncio.get_event_loop()',
763            'print(loop.get_debug())'))
764
765        # Test with -E to not fail if the unit test was run with
766        # PYTHONASYNCIODEBUG set to a non-empty string
767        sts, stdout, stderr = assert_python_ok('-E', '-c', code)
768        self.assertEqual(stdout.rstrip(), b'False')
769
770        sts, stdout, stderr = assert_python_ok('-c', code,
771                                               PYTHONASYNCIODEBUG='',
772                                               PYTHONDEVMODE='')
773        self.assertEqual(stdout.rstrip(), b'False')
774
775        sts, stdout, stderr = assert_python_ok('-c', code,
776                                               PYTHONASYNCIODEBUG='1',
777                                               PYTHONDEVMODE='')
778        self.assertEqual(stdout.rstrip(), b'True')
779
780        sts, stdout, stderr = assert_python_ok('-E', '-c', code,
781                                               PYTHONASYNCIODEBUG='1')
782        self.assertEqual(stdout.rstrip(), b'False')
783
784        # -X dev
785        sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
786                                               '-c', code)
787        self.assertEqual(stdout.rstrip(), b'True')
788
789    def test_create_task(self):
790        class MyTask(asyncio.Task):
791            pass
792
793        @asyncio.coroutine
794        def test():
795            pass
796
797        class EventLoop(base_events.BaseEventLoop):
798            def create_task(self, coro):
799                return MyTask(coro, loop=loop)
800
801        loop = EventLoop()
802        self.set_event_loop(loop)
803
804        coro = test()
805        task = asyncio.ensure_future(coro, loop=loop)
806        self.assertIsInstance(task, MyTask)
807
808        # make warnings quiet
809        task._log_destroy_pending = False
810        coro.close()
811
812    def test_run_forever_keyboard_interrupt(self):
813        # Python issue #22601: ensure that the temporary task created by
814        # run_forever() consumes the KeyboardInterrupt and so don't log
815        # a warning
816        @asyncio.coroutine
817        def raise_keyboard_interrupt():
818            raise KeyboardInterrupt
819
820        self.loop._process_events = mock.Mock()
821        self.loop.call_exception_handler = mock.Mock()
822
823        try:
824            self.loop.run_until_complete(raise_keyboard_interrupt())
825        except KeyboardInterrupt:
826            pass
827        self.loop.close()
828        support.gc_collect()
829
830        self.assertFalse(self.loop.call_exception_handler.called)
831
832    def test_run_until_complete_baseexception(self):
833        # Python issue #22429: run_until_complete() must not schedule a pending
834        # call to stop() if the future raised a BaseException
835        @asyncio.coroutine
836        def raise_keyboard_interrupt():
837            raise KeyboardInterrupt
838
839        self.loop._process_events = mock.Mock()
840
841        try:
842            self.loop.run_until_complete(raise_keyboard_interrupt())
843        except KeyboardInterrupt:
844            pass
845
846        def func():
847            self.loop.stop()
848            func.called = True
849        func.called = False
850        try:
851            self.loop.call_soon(func)
852            self.loop.run_forever()
853        except KeyboardInterrupt:
854            pass
855        self.assertTrue(func.called)
856
857    def test_single_selecter_event_callback_after_stopping(self):
858        # Python issue #25593: A stopped event loop may cause event callbacks
859        # to run more than once.
860        event_sentinel = object()
861        callcount = 0
862        doer = None
863
864        def proc_events(event_list):
865            nonlocal doer
866            if event_sentinel in event_list:
867                doer = self.loop.call_soon(do_event)
868
869        def do_event():
870            nonlocal callcount
871            callcount += 1
872            self.loop.call_soon(clear_selector)
873
874        def clear_selector():
875            doer.cancel()
876            self.loop._selector.select.return_value = ()
877
878        self.loop._process_events = proc_events
879        self.loop._selector.select.return_value = (event_sentinel,)
880
881        for i in range(1, 3):
882            with self.subTest('Loop %d/2' % i):
883                self.loop.call_soon(self.loop.stop)
884                self.loop.run_forever()
885                self.assertEqual(callcount, 1)
886
887    def test_run_once(self):
888        # Simple test for test_utils.run_once().  It may seem strange
889        # to have a test for this (the function isn't even used!) but
890        # it's a de-factor standard API for library tests.  This tests
891        # the idiom: loop.call_soon(loop.stop); loop.run_forever().
892        count = 0
893
894        def callback():
895            nonlocal count
896            count += 1
897
898        self.loop._process_events = mock.Mock()
899        self.loop.call_soon(callback)
900        test_utils.run_once(self.loop)
901        self.assertEqual(count, 1)
902
903    def test_run_forever_pre_stopped(self):
904        # Test that the old idiom for pre-stopping the loop works.
905        self.loop._process_events = mock.Mock()
906        self.loop.stop()
907        self.loop.run_forever()
908        self.loop._selector.select.assert_called_once_with(0)
909
910    async def leave_unfinalized_asyncgen(self):
911        # Create an async generator, iterate it partially, and leave it
912        # to be garbage collected.
913        # Used in async generator finalization tests.
914        # Depends on implementation details of garbage collector. Changes
915        # in gc may break this function.
916        status = {'started': False,
917                  'stopped': False,
918                  'finalized': False}
919
920        async def agen():
921            status['started'] = True
922            try:
923                for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']:
924                    yield item
925            finally:
926                status['finalized'] = True
927
928        ag = agen()
929        ai = ag.__aiter__()
930
931        async def iter_one():
932            try:
933                item = await ai.__anext__()
934            except StopAsyncIteration:
935                return
936            if item == 'THREE':
937                status['stopped'] = True
938                return
939            asyncio.create_task(iter_one())
940
941        asyncio.create_task(iter_one())
942        return status
943
944    def test_asyncgen_finalization_by_gc(self):
945        # Async generators should be finalized when garbage collected.
946        self.loop._process_events = mock.Mock()
947        self.loop._write_to_self = mock.Mock()
948        with support.disable_gc():
949            status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
950            while not status['stopped']:
951                test_utils.run_briefly(self.loop)
952            self.assertTrue(status['started'])
953            self.assertTrue(status['stopped'])
954            self.assertFalse(status['finalized'])
955            support.gc_collect()
956            test_utils.run_briefly(self.loop)
957            self.assertTrue(status['finalized'])
958
959    def test_asyncgen_finalization_by_gc_in_other_thread(self):
960        # Python issue 34769: If garbage collector runs in another
961        # thread, async generators will not finalize in debug
962        # mode.
963        self.loop._process_events = mock.Mock()
964        self.loop._write_to_self = mock.Mock()
965        self.loop.set_debug(True)
966        with support.disable_gc():
967            status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
968            while not status['stopped']:
969                test_utils.run_briefly(self.loop)
970            self.assertTrue(status['started'])
971            self.assertTrue(status['stopped'])
972            self.assertFalse(status['finalized'])
973            self.loop.run_until_complete(
974                self.loop.run_in_executor(None, support.gc_collect))
975            test_utils.run_briefly(self.loop)
976            self.assertTrue(status['finalized'])
977
978
979class MyProto(asyncio.Protocol):
980    done = None
981
982    def __init__(self, create_future=False):
983        self.state = 'INITIAL'
984        self.nbytes = 0
985        if create_future:
986            self.done = asyncio.Future()
987
988    def connection_made(self, transport):
989        self.transport = transport
990        assert self.state == 'INITIAL', self.state
991        self.state = 'CONNECTED'
992        transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
993
994    def data_received(self, data):
995        assert self.state == 'CONNECTED', self.state
996        self.nbytes += len(data)
997
998    def eof_received(self):
999        assert self.state == 'CONNECTED', self.state
1000        self.state = 'EOF'
1001
1002    def connection_lost(self, exc):
1003        assert self.state in ('CONNECTED', 'EOF'), self.state
1004        self.state = 'CLOSED'
1005        if self.done:
1006            self.done.set_result(None)
1007
1008
1009class MyDatagramProto(asyncio.DatagramProtocol):
1010    done = None
1011
1012    def __init__(self, create_future=False, loop=None):
1013        self.state = 'INITIAL'
1014        self.nbytes = 0
1015        if create_future:
1016            self.done = asyncio.Future(loop=loop)
1017
1018    def connection_made(self, transport):
1019        self.transport = transport
1020        assert self.state == 'INITIAL', self.state
1021        self.state = 'INITIALIZED'
1022
1023    def datagram_received(self, data, addr):
1024        assert self.state == 'INITIALIZED', self.state
1025        self.nbytes += len(data)
1026
1027    def error_received(self, exc):
1028        assert self.state == 'INITIALIZED', self.state
1029
1030    def connection_lost(self, exc):
1031        assert self.state == 'INITIALIZED', self.state
1032        self.state = 'CLOSED'
1033        if self.done:
1034            self.done.set_result(None)
1035
1036
1037class BaseEventLoopWithSelectorTests(test_utils.TestCase):
1038
1039    def setUp(self):
1040        super().setUp()
1041        self.loop = asyncio.new_event_loop()
1042        self.set_event_loop(self.loop)
1043
1044    @mock.patch('socket.getnameinfo')
1045    def test_getnameinfo(self, m_gai):
1046        m_gai.side_effect = lambda *args: 42
1047        r = self.loop.run_until_complete(self.loop.getnameinfo(('abc', 123)))
1048        self.assertEqual(r, 42)
1049
1050    @patch_socket
1051    def test_create_connection_multiple_errors(self, m_socket):
1052
1053        class MyProto(asyncio.Protocol):
1054            pass
1055
1056        @asyncio.coroutine
1057        def getaddrinfo(*args, **kw):
1058            yield from []
1059            return [(2, 1, 6, '', ('107.6.106.82', 80)),
1060                    (2, 1, 6, '', ('107.6.106.82', 80))]
1061
1062        def getaddrinfo_task(*args, **kwds):
1063            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1064
1065        idx = -1
1066        errors = ['err1', 'err2']
1067
1068        def _socket(*args, **kw):
1069            nonlocal idx, errors
1070            idx += 1
1071            raise OSError(errors[idx])
1072
1073        m_socket.socket = _socket
1074
1075        self.loop.getaddrinfo = getaddrinfo_task
1076
1077        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1078        with self.assertRaises(OSError) as cm:
1079            self.loop.run_until_complete(coro)
1080
1081        self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2')
1082
1083    @patch_socket
1084    def test_create_connection_timeout(self, m_socket):
1085        # Ensure that the socket is closed on timeout
1086        sock = mock.Mock()
1087        m_socket.socket.return_value = sock
1088
1089        def getaddrinfo(*args, **kw):
1090            fut = asyncio.Future(loop=self.loop)
1091            addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '',
1092                    ('127.0.0.1', 80))
1093            fut.set_result([addr])
1094            return fut
1095        self.loop.getaddrinfo = getaddrinfo
1096
1097        with mock.patch.object(self.loop, 'sock_connect',
1098                               side_effect=asyncio.TimeoutError):
1099            coro = self.loop.create_connection(MyProto, '127.0.0.1', 80)
1100            with self.assertRaises(asyncio.TimeoutError):
1101                self.loop.run_until_complete(coro)
1102            self.assertTrue(sock.close.called)
1103
1104    def test_create_connection_host_port_sock(self):
1105        coro = self.loop.create_connection(
1106            MyProto, 'example.com', 80, sock=object())
1107        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1108
1109    def test_create_connection_wrong_sock(self):
1110        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1111        with sock:
1112            coro = self.loop.create_connection(MyProto, sock=sock)
1113            with self.assertRaisesRegex(ValueError,
1114                                        'A Stream Socket was expected'):
1115                self.loop.run_until_complete(coro)
1116
1117    def test_create_server_wrong_sock(self):
1118        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1119        with sock:
1120            coro = self.loop.create_server(MyProto, sock=sock)
1121            with self.assertRaisesRegex(ValueError,
1122                                        'A Stream Socket was expected'):
1123                self.loop.run_until_complete(coro)
1124
1125    def test_create_server_ssl_timeout_for_plain_socket(self):
1126        coro = self.loop.create_server(
1127            MyProto, 'example.com', 80, ssl_handshake_timeout=1)
1128        with self.assertRaisesRegex(
1129                ValueError,
1130                'ssl_handshake_timeout is only meaningful with ssl'):
1131            self.loop.run_until_complete(coro)
1132
1133    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
1134                         'no socket.SOCK_NONBLOCK (linux only)')
1135    def test_create_server_stream_bittype(self):
1136        sock = socket.socket(
1137            socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
1138        with sock:
1139            coro = self.loop.create_server(lambda: None, sock=sock)
1140            srv = self.loop.run_until_complete(coro)
1141            srv.close()
1142            self.loop.run_until_complete(srv.wait_closed())
1143
1144    @unittest.skipUnless(hasattr(socket, 'AF_INET6'), 'no IPv6 support')
1145    def test_create_server_ipv6(self):
1146        async def main():
1147            srv = await asyncio.start_server(
1148                lambda: None, '::1', 0, loop=self.loop)
1149            try:
1150                self.assertGreater(len(srv.sockets), 0)
1151            finally:
1152                srv.close()
1153                await srv.wait_closed()
1154
1155        try:
1156            self.loop.run_until_complete(main())
1157        except OSError as ex:
1158            if (hasattr(errno, 'EADDRNOTAVAIL') and
1159                    ex.errno == errno.EADDRNOTAVAIL):
1160                self.skipTest('failed to bind to ::1')
1161            else:
1162                raise
1163
1164    def test_create_datagram_endpoint_wrong_sock(self):
1165        sock = socket.socket(socket.AF_INET)
1166        with sock:
1167            coro = self.loop.create_datagram_endpoint(MyProto, sock=sock)
1168            with self.assertRaisesRegex(ValueError,
1169                                        'A UDP Socket was expected'):
1170                self.loop.run_until_complete(coro)
1171
1172    def test_create_connection_no_host_port_sock(self):
1173        coro = self.loop.create_connection(MyProto)
1174        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1175
1176    def test_create_connection_no_getaddrinfo(self):
1177        @asyncio.coroutine
1178        def getaddrinfo(*args, **kw):
1179            yield from []
1180
1181        def getaddrinfo_task(*args, **kwds):
1182            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1183
1184        self.loop.getaddrinfo = getaddrinfo_task
1185        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1186        self.assertRaises(
1187            OSError, self.loop.run_until_complete, coro)
1188
1189    def test_create_connection_connect_err(self):
1190        async def getaddrinfo(*args, **kw):
1191            return [(2, 1, 6, '', ('107.6.106.82', 80))]
1192
1193        def getaddrinfo_task(*args, **kwds):
1194            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1195
1196        self.loop.getaddrinfo = getaddrinfo_task
1197        self.loop.sock_connect = mock.Mock()
1198        self.loop.sock_connect.side_effect = OSError
1199
1200        coro = self.loop.create_connection(MyProto, 'example.com', 80)
1201        self.assertRaises(
1202            OSError, self.loop.run_until_complete, coro)
1203
1204    def test_create_connection_multiple(self):
1205        @asyncio.coroutine
1206        def getaddrinfo(*args, **kw):
1207            return [(2, 1, 6, '', ('0.0.0.1', 80)),
1208                    (2, 1, 6, '', ('0.0.0.2', 80))]
1209
1210        def getaddrinfo_task(*args, **kwds):
1211            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1212
1213        self.loop.getaddrinfo = getaddrinfo_task
1214        self.loop.sock_connect = mock.Mock()
1215        self.loop.sock_connect.side_effect = OSError
1216
1217        coro = self.loop.create_connection(
1218            MyProto, 'example.com', 80, family=socket.AF_INET)
1219        with self.assertRaises(OSError):
1220            self.loop.run_until_complete(coro)
1221
1222    @patch_socket
1223    def test_create_connection_multiple_errors_local_addr(self, m_socket):
1224
1225        def bind(addr):
1226            if addr[0] == '0.0.0.1':
1227                err = OSError('Err')
1228                err.strerror = 'Err'
1229                raise err
1230
1231        m_socket.socket.return_value.bind = bind
1232
1233        @asyncio.coroutine
1234        def getaddrinfo(*args, **kw):
1235            return [(2, 1, 6, '', ('0.0.0.1', 80)),
1236                    (2, 1, 6, '', ('0.0.0.2', 80))]
1237
1238        def getaddrinfo_task(*args, **kwds):
1239            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1240
1241        self.loop.getaddrinfo = getaddrinfo_task
1242        self.loop.sock_connect = mock.Mock()
1243        self.loop.sock_connect.side_effect = OSError('Err2')
1244
1245        coro = self.loop.create_connection(
1246            MyProto, 'example.com', 80, family=socket.AF_INET,
1247            local_addr=(None, 8080))
1248        with self.assertRaises(OSError) as cm:
1249            self.loop.run_until_complete(coro)
1250
1251        self.assertTrue(str(cm.exception).startswith('Multiple exceptions: '))
1252        self.assertTrue(m_socket.socket.return_value.close.called)
1253
1254    def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton):
1255        # Test the fallback code, even if this system has inet_pton.
1256        if not allow_inet_pton:
1257            del m_socket.inet_pton
1258
1259        m_socket.getaddrinfo = socket.getaddrinfo
1260        sock = m_socket.socket.return_value
1261
1262        self.loop._add_reader = mock.Mock()
1263        self.loop._add_reader._is_coroutine = False
1264        self.loop._add_writer = mock.Mock()
1265        self.loop._add_writer._is_coroutine = False
1266
1267        coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80)
1268        t, p = self.loop.run_until_complete(coro)
1269        try:
1270            sock.connect.assert_called_with(('1.2.3.4', 80))
1271            _, kwargs = m_socket.socket.call_args
1272            self.assertEqual(kwargs['family'], m_socket.AF_INET)
1273            self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1274        finally:
1275            t.close()
1276            test_utils.run_briefly(self.loop)  # allow transport to close
1277
1278        sock.family = socket.AF_INET6
1279        coro = self.loop.create_connection(asyncio.Protocol, '::1', 80)
1280        t, p = self.loop.run_until_complete(coro)
1281        try:
1282            # Without inet_pton we use getaddrinfo, which transforms ('::1', 80)
1283            # to ('::1', 80, 0, 0). The last 0s are flow info, scope id.
1284            [address] = sock.connect.call_args[0]
1285            host, port = address[:2]
1286            self.assertRegex(host, r'::(0\.)*1')
1287            self.assertEqual(port, 80)
1288            _, kwargs = m_socket.socket.call_args
1289            self.assertEqual(kwargs['family'], m_socket.AF_INET6)
1290            self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1291        finally:
1292            t.close()
1293            test_utils.run_briefly(self.loop)  # allow transport to close
1294
1295    @patch_socket
1296    def test_create_connection_ip_addr(self, m_socket):
1297        self._test_create_connection_ip_addr(m_socket, True)
1298
1299    @patch_socket
1300    def test_create_connection_no_inet_pton(self, m_socket):
1301        self._test_create_connection_ip_addr(m_socket, False)
1302
1303    @patch_socket
1304    def test_create_connection_service_name(self, m_socket):
1305        m_socket.getaddrinfo = socket.getaddrinfo
1306        sock = m_socket.socket.return_value
1307
1308        self.loop._add_reader = mock.Mock()
1309        self.loop._add_reader._is_coroutine = False
1310        self.loop._add_writer = mock.Mock()
1311        self.loop._add_writer._is_coroutine = False
1312
1313        for service, port in ('http', 80), (b'http', 80):
1314            coro = self.loop.create_connection(asyncio.Protocol,
1315                                               '127.0.0.1', service)
1316
1317            t, p = self.loop.run_until_complete(coro)
1318            try:
1319                sock.connect.assert_called_with(('127.0.0.1', port))
1320                _, kwargs = m_socket.socket.call_args
1321                self.assertEqual(kwargs['family'], m_socket.AF_INET)
1322                self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
1323            finally:
1324                t.close()
1325                test_utils.run_briefly(self.loop)  # allow transport to close
1326
1327        for service in 'nonsense', b'nonsense':
1328            coro = self.loop.create_connection(asyncio.Protocol,
1329                                               '127.0.0.1', service)
1330
1331            with self.assertRaises(OSError):
1332                self.loop.run_until_complete(coro)
1333
1334    def test_create_connection_no_local_addr(self):
1335        @asyncio.coroutine
1336        def getaddrinfo(host, *args, **kw):
1337            if host == 'example.com':
1338                return [(2, 1, 6, '', ('107.6.106.82', 80)),
1339                        (2, 1, 6, '', ('107.6.106.82', 80))]
1340            else:
1341                return []
1342
1343        def getaddrinfo_task(*args, **kwds):
1344            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1345        self.loop.getaddrinfo = getaddrinfo_task
1346
1347        coro = self.loop.create_connection(
1348            MyProto, 'example.com', 80, family=socket.AF_INET,
1349            local_addr=(None, 8080))
1350        self.assertRaises(
1351            OSError, self.loop.run_until_complete, coro)
1352
1353    @patch_socket
1354    def test_create_connection_bluetooth(self, m_socket):
1355        # See http://bugs.python.org/issue27136, fallback to getaddrinfo when
1356        # we can't recognize an address is resolved, e.g. a Bluetooth address.
1357        addr = ('00:01:02:03:04:05', 1)
1358
1359        def getaddrinfo(host, port, *args, **kw):
1360            assert (host, port) == addr
1361            return [(999, 1, 999, '', (addr, 1))]
1362
1363        m_socket.getaddrinfo = getaddrinfo
1364        sock = m_socket.socket()
1365        coro = self.loop.sock_connect(sock, addr)
1366        self.loop.run_until_complete(coro)
1367
1368    def test_create_connection_ssl_server_hostname_default(self):
1369        self.loop.getaddrinfo = mock.Mock()
1370
1371        def mock_getaddrinfo(*args, **kwds):
1372            f = asyncio.Future(loop=self.loop)
1373            f.set_result([(socket.AF_INET, socket.SOCK_STREAM,
1374                           socket.SOL_TCP, '', ('1.2.3.4', 80))])
1375            return f
1376
1377        self.loop.getaddrinfo.side_effect = mock_getaddrinfo
1378        self.loop.sock_connect = mock.Mock()
1379        self.loop.sock_connect.return_value = self.loop.create_future()
1380        self.loop.sock_connect.return_value.set_result(None)
1381        self.loop._make_ssl_transport = mock.Mock()
1382
1383        class _SelectorTransportMock:
1384            _sock = None
1385
1386            def get_extra_info(self, key):
1387                return mock.Mock()
1388
1389            def close(self):
1390                self._sock.close()
1391
1392        def mock_make_ssl_transport(sock, protocol, sslcontext, waiter,
1393                                    **kwds):
1394            waiter.set_result(None)
1395            transport = _SelectorTransportMock()
1396            transport._sock = sock
1397            return transport
1398
1399        self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport
1400        ANY = mock.ANY
1401        handshake_timeout = object()
1402        # First try the default server_hostname.
1403        self.loop._make_ssl_transport.reset_mock()
1404        coro = self.loop.create_connection(
1405                MyProto, 'python.org', 80, ssl=True,
1406                ssl_handshake_timeout=handshake_timeout)
1407        transport, _ = self.loop.run_until_complete(coro)
1408        transport.close()
1409        self.loop._make_ssl_transport.assert_called_with(
1410            ANY, ANY, ANY, ANY,
1411            server_side=False,
1412            server_hostname='python.org',
1413            ssl_handshake_timeout=handshake_timeout)
1414        # Next try an explicit server_hostname.
1415        self.loop._make_ssl_transport.reset_mock()
1416        coro = self.loop.create_connection(
1417                MyProto, 'python.org', 80, ssl=True,
1418                server_hostname='perl.com',
1419                ssl_handshake_timeout=handshake_timeout)
1420        transport, _ = self.loop.run_until_complete(coro)
1421        transport.close()
1422        self.loop._make_ssl_transport.assert_called_with(
1423            ANY, ANY, ANY, ANY,
1424            server_side=False,
1425            server_hostname='perl.com',
1426            ssl_handshake_timeout=handshake_timeout)
1427        # Finally try an explicit empty server_hostname.
1428        self.loop._make_ssl_transport.reset_mock()
1429        coro = self.loop.create_connection(
1430                MyProto, 'python.org', 80, ssl=True,
1431                server_hostname='',
1432                ssl_handshake_timeout=handshake_timeout)
1433        transport, _ = self.loop.run_until_complete(coro)
1434        transport.close()
1435        self.loop._make_ssl_transport.assert_called_with(
1436                ANY, ANY, ANY, ANY,
1437                server_side=False,
1438                server_hostname='',
1439                ssl_handshake_timeout=handshake_timeout)
1440
1441    def test_create_connection_no_ssl_server_hostname_errors(self):
1442        # When not using ssl, server_hostname must be None.
1443        coro = self.loop.create_connection(MyProto, 'python.org', 80,
1444                                           server_hostname='')
1445        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1446        coro = self.loop.create_connection(MyProto, 'python.org', 80,
1447                                           server_hostname='python.org')
1448        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1449
1450    def test_create_connection_ssl_server_hostname_errors(self):
1451        # When using ssl, server_hostname may be None if host is non-empty.
1452        coro = self.loop.create_connection(MyProto, '', 80, ssl=True)
1453        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1454        coro = self.loop.create_connection(MyProto, None, 80, ssl=True)
1455        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1456        sock = socket.socket()
1457        coro = self.loop.create_connection(MyProto, None, None,
1458                                           ssl=True, sock=sock)
1459        self.addCleanup(sock.close)
1460        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1461
1462    def test_create_connection_ssl_timeout_for_plain_socket(self):
1463        coro = self.loop.create_connection(
1464            MyProto, 'example.com', 80, ssl_handshake_timeout=1)
1465        with self.assertRaisesRegex(
1466                ValueError,
1467                'ssl_handshake_timeout is only meaningful with ssl'):
1468            self.loop.run_until_complete(coro)
1469
1470    def test_create_server_empty_host(self):
1471        # if host is empty string use None instead
1472        host = object()
1473
1474        @asyncio.coroutine
1475        def getaddrinfo(*args, **kw):
1476            nonlocal host
1477            host = args[0]
1478            yield from []
1479
1480        def getaddrinfo_task(*args, **kwds):
1481            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
1482
1483        self.loop.getaddrinfo = getaddrinfo_task
1484        fut = self.loop.create_server(MyProto, '', 0)
1485        self.assertRaises(OSError, self.loop.run_until_complete, fut)
1486        self.assertIsNone(host)
1487
1488    def test_create_server_host_port_sock(self):
1489        fut = self.loop.create_server(
1490            MyProto, '0.0.0.0', 0, sock=object())
1491        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1492
1493    def test_create_server_no_host_port_sock(self):
1494        fut = self.loop.create_server(MyProto)
1495        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1496
1497    def test_create_server_no_getaddrinfo(self):
1498        getaddrinfo = self.loop.getaddrinfo = mock.Mock()
1499        getaddrinfo.return_value = self.loop.create_future()
1500        getaddrinfo.return_value.set_result(None)
1501
1502        f = self.loop.create_server(MyProto, 'python.org', 0)
1503        self.assertRaises(OSError, self.loop.run_until_complete, f)
1504
1505    @patch_socket
1506    def test_create_server_nosoreuseport(self, m_socket):
1507        m_socket.getaddrinfo = socket.getaddrinfo
1508        del m_socket.SO_REUSEPORT
1509        m_socket.socket.return_value = mock.Mock()
1510
1511        f = self.loop.create_server(
1512            MyProto, '0.0.0.0', 0, reuse_port=True)
1513
1514        self.assertRaises(ValueError, self.loop.run_until_complete, f)
1515
1516    @patch_socket
1517    def test_create_server_soreuseport_only_defined(self, m_socket):
1518        m_socket.getaddrinfo = socket.getaddrinfo
1519        m_socket.socket.return_value = mock.Mock()
1520        m_socket.SO_REUSEPORT = -1
1521
1522        f = self.loop.create_server(
1523            MyProto, '0.0.0.0', 0, reuse_port=True)
1524
1525        self.assertRaises(ValueError, self.loop.run_until_complete, f)
1526
1527    @patch_socket
1528    def test_create_server_cant_bind(self, m_socket):
1529
1530        class Err(OSError):
1531            strerror = 'error'
1532
1533        m_socket.getaddrinfo.return_value = [
1534            (2, 1, 6, '', ('127.0.0.1', 10100))]
1535        m_socket.getaddrinfo._is_coroutine = False
1536        m_sock = m_socket.socket.return_value = mock.Mock()
1537        m_sock.bind.side_effect = Err
1538
1539        fut = self.loop.create_server(MyProto, '0.0.0.0', 0)
1540        self.assertRaises(OSError, self.loop.run_until_complete, fut)
1541        self.assertTrue(m_sock.close.called)
1542
1543    @patch_socket
1544    def test_create_datagram_endpoint_no_addrinfo(self, m_socket):
1545        m_socket.getaddrinfo.return_value = []
1546        m_socket.getaddrinfo._is_coroutine = False
1547
1548        coro = self.loop.create_datagram_endpoint(
1549            MyDatagramProto, local_addr=('localhost', 0))
1550        self.assertRaises(
1551            OSError, self.loop.run_until_complete, coro)
1552
1553    def test_create_datagram_endpoint_addr_error(self):
1554        coro = self.loop.create_datagram_endpoint(
1555            MyDatagramProto, local_addr='localhost')
1556        self.assertRaises(
1557            AssertionError, self.loop.run_until_complete, coro)
1558        coro = self.loop.create_datagram_endpoint(
1559            MyDatagramProto, local_addr=('localhost', 1, 2, 3))
1560        self.assertRaises(
1561            AssertionError, self.loop.run_until_complete, coro)
1562
1563    def test_create_datagram_endpoint_connect_err(self):
1564        self.loop.sock_connect = mock.Mock()
1565        self.loop.sock_connect.side_effect = OSError
1566
1567        coro = self.loop.create_datagram_endpoint(
1568            asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0))
1569        self.assertRaises(
1570            OSError, self.loop.run_until_complete, coro)
1571
1572    @patch_socket
1573    def test_create_datagram_endpoint_socket_err(self, m_socket):
1574        m_socket.getaddrinfo = socket.getaddrinfo
1575        m_socket.socket.side_effect = OSError
1576
1577        coro = self.loop.create_datagram_endpoint(
1578            asyncio.DatagramProtocol, family=socket.AF_INET)
1579        self.assertRaises(
1580            OSError, self.loop.run_until_complete, coro)
1581
1582        coro = self.loop.create_datagram_endpoint(
1583            asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0))
1584        self.assertRaises(
1585            OSError, self.loop.run_until_complete, coro)
1586
1587    @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
1588    def test_create_datagram_endpoint_no_matching_family(self):
1589        coro = self.loop.create_datagram_endpoint(
1590            asyncio.DatagramProtocol,
1591            remote_addr=('127.0.0.1', 0), local_addr=('::1', 0))
1592        self.assertRaises(
1593            ValueError, self.loop.run_until_complete, coro)
1594
1595    @patch_socket
1596    def test_create_datagram_endpoint_setblk_err(self, m_socket):
1597        m_socket.socket.return_value.setblocking.side_effect = OSError
1598
1599        coro = self.loop.create_datagram_endpoint(
1600            asyncio.DatagramProtocol, family=socket.AF_INET)
1601        self.assertRaises(
1602            OSError, self.loop.run_until_complete, coro)
1603        self.assertTrue(
1604            m_socket.socket.return_value.close.called)
1605
1606    def test_create_datagram_endpoint_noaddr_nofamily(self):
1607        coro = self.loop.create_datagram_endpoint(
1608            asyncio.DatagramProtocol)
1609        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1610
1611    @patch_socket
1612    def test_create_datagram_endpoint_cant_bind(self, m_socket):
1613        class Err(OSError):
1614            pass
1615
1616        m_socket.getaddrinfo = socket.getaddrinfo
1617        m_sock = m_socket.socket.return_value = mock.Mock()
1618        m_sock.bind.side_effect = Err
1619
1620        fut = self.loop.create_datagram_endpoint(
1621            MyDatagramProto,
1622            local_addr=('127.0.0.1', 0), family=socket.AF_INET)
1623        self.assertRaises(Err, self.loop.run_until_complete, fut)
1624        self.assertTrue(m_sock.close.called)
1625
1626    def test_create_datagram_endpoint_sock(self):
1627        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1628        sock.bind(('127.0.0.1', 0))
1629        fut = self.loop.create_datagram_endpoint(
1630            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1631            sock=sock)
1632        transport, protocol = self.loop.run_until_complete(fut)
1633        transport.close()
1634        self.loop.run_until_complete(protocol.done)
1635        self.assertEqual('CLOSED', protocol.state)
1636
1637    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
1638    def test_create_datagram_endpoint_sock_unix(self):
1639        fut = self.loop.create_datagram_endpoint(
1640            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1641            family=socket.AF_UNIX)
1642        transport, protocol = self.loop.run_until_complete(fut)
1643        assert transport._sock.family == socket.AF_UNIX
1644        transport.close()
1645        self.loop.run_until_complete(protocol.done)
1646        self.assertEqual('CLOSED', protocol.state)
1647
1648    def test_create_datagram_endpoint_sock_sockopts(self):
1649        class FakeSock:
1650            type = socket.SOCK_DGRAM
1651
1652        fut = self.loop.create_datagram_endpoint(
1653            MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock())
1654        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1655
1656        fut = self.loop.create_datagram_endpoint(
1657            MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock())
1658        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1659
1660        fut = self.loop.create_datagram_endpoint(
1661            MyDatagramProto, family=1, sock=FakeSock())
1662        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1663
1664        fut = self.loop.create_datagram_endpoint(
1665            MyDatagramProto, proto=1, sock=FakeSock())
1666        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1667
1668        fut = self.loop.create_datagram_endpoint(
1669            MyDatagramProto, flags=1, sock=FakeSock())
1670        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1671
1672        fut = self.loop.create_datagram_endpoint(
1673            MyDatagramProto, reuse_address=True, sock=FakeSock())
1674        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1675
1676        fut = self.loop.create_datagram_endpoint(
1677            MyDatagramProto, reuse_port=True, sock=FakeSock())
1678        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1679
1680        fut = self.loop.create_datagram_endpoint(
1681            MyDatagramProto, allow_broadcast=True, sock=FakeSock())
1682        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
1683
1684    def test_create_datagram_endpoint_sockopts(self):
1685        # Socket options should not be applied unless asked for.
1686        # SO_REUSEADDR defaults to on for UNIX.
1687        # SO_REUSEPORT is not available on all platforms.
1688
1689        coro = self.loop.create_datagram_endpoint(
1690            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1691            local_addr=('127.0.0.1', 0))
1692        transport, protocol = self.loop.run_until_complete(coro)
1693        sock = transport.get_extra_info('socket')
1694
1695        reuse_address_default_on = (
1696            os.name == 'posix' and sys.platform != 'cygwin')
1697        reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
1698
1699        if reuse_address_default_on:
1700            self.assertTrue(
1701                sock.getsockopt(
1702                    socket.SOL_SOCKET, socket.SO_REUSEADDR))
1703        else:
1704            self.assertFalse(
1705                sock.getsockopt(
1706                    socket.SOL_SOCKET, socket.SO_REUSEADDR))
1707        if reuseport_supported:
1708            self.assertFalse(
1709                sock.getsockopt(
1710                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
1711        self.assertFalse(
1712            sock.getsockopt(
1713                socket.SOL_SOCKET, socket.SO_BROADCAST))
1714
1715        transport.close()
1716        self.loop.run_until_complete(protocol.done)
1717        self.assertEqual('CLOSED', protocol.state)
1718
1719        coro = self.loop.create_datagram_endpoint(
1720            lambda: MyDatagramProto(create_future=True, loop=self.loop),
1721            local_addr=('127.0.0.1', 0),
1722            reuse_address=True,
1723            reuse_port=reuseport_supported,
1724            allow_broadcast=True)
1725        transport, protocol = self.loop.run_until_complete(coro)
1726        sock = transport.get_extra_info('socket')
1727
1728        self.assertTrue(
1729            sock.getsockopt(
1730                socket.SOL_SOCKET, socket.SO_REUSEADDR))
1731        if reuseport_supported:
1732            self.assertTrue(
1733                sock.getsockopt(
1734                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
1735        self.assertTrue(
1736            sock.getsockopt(
1737                socket.SOL_SOCKET, socket.SO_BROADCAST))
1738
1739        transport.close()
1740        self.loop.run_until_complete(protocol.done)
1741        self.assertEqual('CLOSED', protocol.state)
1742
1743    @patch_socket
1744    def test_create_datagram_endpoint_nosoreuseport(self, m_socket):
1745        del m_socket.SO_REUSEPORT
1746        m_socket.socket.return_value = mock.Mock()
1747
1748        coro = self.loop.create_datagram_endpoint(
1749            lambda: MyDatagramProto(loop=self.loop),
1750            local_addr=('127.0.0.1', 0),
1751            reuse_address=False,
1752            reuse_port=True)
1753
1754        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
1755
1756    @patch_socket
1757    def test_create_datagram_endpoint_ip_addr(self, m_socket):
1758        def getaddrinfo(*args, **kw):
1759            self.fail('should not have called getaddrinfo')
1760
1761        m_socket.getaddrinfo = getaddrinfo
1762        m_socket.socket.return_value.bind = bind = mock.Mock()
1763        self.loop._add_reader = mock.Mock()
1764        self.loop._add_reader._is_coroutine = False
1765
1766        reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
1767        coro = self.loop.create_datagram_endpoint(
1768            lambda: MyDatagramProto(loop=self.loop),
1769            local_addr=('1.2.3.4', 0),
1770            reuse_address=False,
1771            reuse_port=reuseport_supported)
1772
1773        t, p = self.loop.run_until_complete(coro)
1774        try:
1775            bind.assert_called_with(('1.2.3.4', 0))
1776            m_socket.socket.assert_called_with(family=m_socket.AF_INET,
1777                                               proto=m_socket.IPPROTO_UDP,
1778                                               type=m_socket.SOCK_DGRAM)
1779        finally:
1780            t.close()
1781            test_utils.run_briefly(self.loop)  # allow transport to close
1782
1783    def test_accept_connection_retry(self):
1784        sock = mock.Mock()
1785        sock.accept.side_effect = BlockingIOError()
1786
1787        self.loop._accept_connection(MyProto, sock)
1788        self.assertFalse(sock.close.called)
1789
1790    @mock.patch('asyncio.base_events.logger')
1791    def test_accept_connection_exception(self, m_log):
1792        sock = mock.Mock()
1793        sock.fileno.return_value = 10
1794        sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
1795        self.loop._remove_reader = mock.Mock()
1796        self.loop.call_later = mock.Mock()
1797
1798        self.loop._accept_connection(MyProto, sock)
1799        self.assertTrue(m_log.error.called)
1800        self.assertFalse(sock.close.called)
1801        self.loop._remove_reader.assert_called_with(10)
1802        self.loop.call_later.assert_called_with(
1803            constants.ACCEPT_RETRY_DELAY,
1804            # self.loop._start_serving
1805            mock.ANY,
1806            MyProto, sock, None, None, mock.ANY, mock.ANY)
1807
1808    def test_call_coroutine(self):
1809        @asyncio.coroutine
1810        def simple_coroutine():
1811            pass
1812
1813        self.loop.set_debug(True)
1814        coro_func = simple_coroutine
1815        coro_obj = coro_func()
1816        self.addCleanup(coro_obj.close)
1817        for func in (coro_func, coro_obj):
1818            with self.assertRaises(TypeError):
1819                self.loop.call_soon(func)
1820            with self.assertRaises(TypeError):
1821                self.loop.call_soon_threadsafe(func)
1822            with self.assertRaises(TypeError):
1823                self.loop.call_later(60, func)
1824            with self.assertRaises(TypeError):
1825                self.loop.call_at(self.loop.time() + 60, func)
1826            with self.assertRaises(TypeError):
1827                self.loop.run_until_complete(
1828                    self.loop.run_in_executor(None, func))
1829
1830    @mock.patch('asyncio.base_events.logger')
1831    def test_log_slow_callbacks(self, m_logger):
1832        def stop_loop_cb(loop):
1833            loop.stop()
1834
1835        @asyncio.coroutine
1836        def stop_loop_coro(loop):
1837            yield from ()
1838            loop.stop()
1839
1840        asyncio.set_event_loop(self.loop)
1841        self.loop.set_debug(True)
1842        self.loop.slow_callback_duration = 0.0
1843
1844        # slow callback
1845        self.loop.call_soon(stop_loop_cb, self.loop)
1846        self.loop.run_forever()
1847        fmt, *args = m_logger.warning.call_args[0]
1848        self.assertRegex(fmt % tuple(args),
1849                         "^Executing <Handle.*stop_loop_cb.*> "
1850                         "took .* seconds$")
1851
1852        # slow task
1853        asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop)
1854        self.loop.run_forever()
1855        fmt, *args = m_logger.warning.call_args[0]
1856        self.assertRegex(fmt % tuple(args),
1857                         "^Executing <Task.*stop_loop_coro.*> "
1858                         "took .* seconds$")
1859
1860
1861class RunningLoopTests(unittest.TestCase):
1862
1863    def test_running_loop_within_a_loop(self):
1864        @asyncio.coroutine
1865        def runner(loop):
1866            loop.run_forever()
1867
1868        loop = asyncio.new_event_loop()
1869        outer_loop = asyncio.new_event_loop()
1870        try:
1871            with self.assertRaisesRegex(RuntimeError,
1872                                        'while another loop is running'):
1873                outer_loop.run_until_complete(runner(loop))
1874        finally:
1875            loop.close()
1876            outer_loop.close()
1877
1878
1879class BaseLoopSockSendfileTests(test_utils.TestCase):
1880
1881    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
1882
1883    class MyProto(asyncio.Protocol):
1884
1885        def __init__(self, loop):
1886            self.started = False
1887            self.closed = False
1888            self.data = bytearray()
1889            self.fut = loop.create_future()
1890            self.transport = None
1891
1892        def connection_made(self, transport):
1893            self.started = True
1894            self.transport = transport
1895
1896        def data_received(self, data):
1897            self.data.extend(data)
1898
1899        def connection_lost(self, exc):
1900            self.closed = True
1901            self.fut.set_result(None)
1902            self.transport = None
1903
1904        async def wait_closed(self):
1905            await self.fut
1906
1907    @classmethod
1908    def setUpClass(cls):
1909        cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE
1910        constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16
1911        with open(support.TESTFN, 'wb') as fp:
1912            fp.write(cls.DATA)
1913        super().setUpClass()
1914
1915    @classmethod
1916    def tearDownClass(cls):
1917        constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize
1918        support.unlink(support.TESTFN)
1919        super().tearDownClass()
1920
1921    def setUp(self):
1922        from asyncio.selector_events import BaseSelectorEventLoop
1923        # BaseSelectorEventLoop() has no native implementation
1924        self.loop = BaseSelectorEventLoop()
1925        self.set_event_loop(self.loop)
1926        self.file = open(support.TESTFN, 'rb')
1927        self.addCleanup(self.file.close)
1928        super().setUp()
1929
1930    def make_socket(self, blocking=False):
1931        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1932        sock.setblocking(blocking)
1933        self.addCleanup(sock.close)
1934        return sock
1935
1936    def run_loop(self, coro):
1937        return self.loop.run_until_complete(coro)
1938
1939    def prepare(self):
1940        sock = self.make_socket()
1941        proto = self.MyProto(self.loop)
1942        server = self.run_loop(self.loop.create_server(
1943            lambda: proto, support.HOST, 0, family=socket.AF_INET))
1944        addr = server.sockets[0].getsockname()
1945
1946        for _ in range(10):
1947            try:
1948                self.run_loop(self.loop.sock_connect(sock, addr))
1949            except OSError:
1950                self.run_loop(asyncio.sleep(0.5))
1951                continue
1952            else:
1953                break
1954        else:
1955            # One last try, so we get the exception
1956            self.run_loop(self.loop.sock_connect(sock, addr))
1957
1958        def cleanup():
1959            server.close()
1960            self.run_loop(server.wait_closed())
1961            sock.close()
1962            if proto.transport is not None:
1963                proto.transport.close()
1964                self.run_loop(proto.wait_closed())
1965
1966        self.addCleanup(cleanup)
1967
1968        return sock, proto
1969
1970    def test__sock_sendfile_native_failure(self):
1971        sock, proto = self.prepare()
1972
1973        with self.assertRaisesRegex(events.SendfileNotAvailableError,
1974                                    "sendfile is not available"):
1975            self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
1976                                                          0, None))
1977
1978        self.assertEqual(proto.data, b'')
1979        self.assertEqual(self.file.tell(), 0)
1980
1981    def test_sock_sendfile_no_fallback(self):
1982        sock, proto = self.prepare()
1983
1984        with self.assertRaisesRegex(events.SendfileNotAvailableError,
1985                                    "sendfile is not available"):
1986            self.run_loop(self.loop.sock_sendfile(sock, self.file,
1987                                                  fallback=False))
1988
1989        self.assertEqual(self.file.tell(), 0)
1990        self.assertEqual(proto.data, b'')
1991
1992    def test_sock_sendfile_fallback(self):
1993        sock, proto = self.prepare()
1994
1995        ret = self.run_loop(self.loop.sock_sendfile(sock, self.file))
1996        sock.close()
1997        self.run_loop(proto.wait_closed())
1998
1999        self.assertEqual(ret, len(self.DATA))
2000        self.assertEqual(self.file.tell(), len(self.DATA))
2001        self.assertEqual(proto.data, self.DATA)
2002
2003    def test_sock_sendfile_fallback_offset_and_count(self):
2004        sock, proto = self.prepare()
2005
2006        ret = self.run_loop(self.loop.sock_sendfile(sock, self.file,
2007                                                    1000, 2000))
2008        sock.close()
2009        self.run_loop(proto.wait_closed())
2010
2011        self.assertEqual(ret, 2000)
2012        self.assertEqual(self.file.tell(), 3000)
2013        self.assertEqual(proto.data, self.DATA[1000:3000])
2014
2015    def test_blocking_socket(self):
2016        self.loop.set_debug(True)
2017        sock = self.make_socket(blocking=True)
2018        with self.assertRaisesRegex(ValueError, "must be non-blocking"):
2019            self.run_loop(self.loop.sock_sendfile(sock, self.file))
2020
2021    def test_nonbinary_file(self):
2022        sock = self.make_socket()
2023        with open(support.TESTFN, 'r') as f:
2024            with self.assertRaisesRegex(ValueError, "binary mode"):
2025                self.run_loop(self.loop.sock_sendfile(sock, f))
2026
2027    def test_nonstream_socket(self):
2028        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
2029        sock.setblocking(False)
2030        self.addCleanup(sock.close)
2031        with self.assertRaisesRegex(ValueError, "only SOCK_STREAM type"):
2032            self.run_loop(self.loop.sock_sendfile(sock, self.file))
2033
2034    def test_notint_count(self):
2035        sock = self.make_socket()
2036        with self.assertRaisesRegex(TypeError,
2037                                    "count must be a positive integer"):
2038            self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, 'count'))
2039
2040    def test_negative_count(self):
2041        sock = self.make_socket()
2042        with self.assertRaisesRegex(ValueError,
2043                                    "count must be a positive integer"):
2044            self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, -1))
2045
2046    def test_notint_offset(self):
2047        sock = self.make_socket()
2048        with self.assertRaisesRegex(TypeError,
2049                                    "offset must be a non-negative integer"):
2050            self.run_loop(self.loop.sock_sendfile(sock, self.file, 'offset'))
2051
2052    def test_negative_offset(self):
2053        sock = self.make_socket()
2054        with self.assertRaisesRegex(ValueError,
2055                                    "offset must be a non-negative integer"):
2056            self.run_loop(self.loop.sock_sendfile(sock, self.file, -1))
2057
2058
2059class TestSelectorUtils(test_utils.TestCase):
2060    def check_set_nodelay(self, sock):
2061        opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
2062        self.assertFalse(opt)
2063
2064        base_events._set_nodelay(sock)
2065
2066        opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
2067        self.assertTrue(opt)
2068
2069    @unittest.skipUnless(hasattr(socket, 'TCP_NODELAY'),
2070                         'need socket.TCP_NODELAY')
2071    def test_set_nodelay(self):
2072        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM,
2073                             proto=socket.IPPROTO_TCP)
2074        with sock:
2075            self.check_set_nodelay(sock)
2076
2077        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM,
2078                             proto=socket.IPPROTO_TCP)
2079        with sock:
2080            sock.setblocking(False)
2081            self.check_set_nodelay(sock)
2082
2083
2084
2085if __name__ == '__main__':
2086    unittest.main()
2087