• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of I/O events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called.  This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16import collections
17import collections.abc
18import concurrent.futures
19import functools
20import heapq
21import itertools
22import os
23import socket
24import stat
25import subprocess
26import threading
27import time
28import traceback
29import sys
30import warnings
31import weakref
32
33try:
34    import ssl
35except ImportError:  # pragma: no cover
36    ssl = None
37
38from . import constants
39from . import coroutines
40from . import events
41from . import exceptions
42from . import futures
43from . import protocols
44from . import sslproto
45from . import staggered
46from . import tasks
47from . import transports
48from . import trsock
49from .log import logger
50
51
52__all__ = 'BaseEventLoop',
53
54
55# Minimum number of _scheduled timer handles before cleanup of
56# cancelled handles is performed.
57_MIN_SCHEDULED_TIMER_HANDLES = 100
58
59# Minimum fraction of _scheduled timer handles that are cancelled
60# before cleanup of cancelled handles is performed.
61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
62
63
64_HAS_IPv6 = hasattr(socket, 'AF_INET6')
65
66# Maximum timeout passed to select to avoid OS limitations
67MAXIMUM_SELECT_TIMEOUT = 24 * 3600
68
69# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s
70# *reuse_address* parameter
71_unset = object()
72
73
74def _format_handle(handle):
75    cb = handle._callback
76    if isinstance(getattr(cb, '__self__', None), tasks.Task):
77        # format the task
78        return repr(cb.__self__)
79    else:
80        return str(handle)
81
82
83def _format_pipe(fd):
84    if fd == subprocess.PIPE:
85        return '<pipe>'
86    elif fd == subprocess.STDOUT:
87        return '<stdout>'
88    else:
89        return repr(fd)
90
91
92def _set_reuseport(sock):
93    if not hasattr(socket, 'SO_REUSEPORT'):
94        raise ValueError('reuse_port not supported by socket module')
95    else:
96        try:
97            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
98        except OSError:
99            raise ValueError('reuse_port not supported by socket module, '
100                             'SO_REUSEPORT defined but not implemented.')
101
102
103def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
104    # Try to skip getaddrinfo if "host" is already an IP. Users might have
105    # handled name resolution in their own code and pass in resolved IPs.
106    if not hasattr(socket, 'inet_pton'):
107        return
108
109    if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
110            host is None:
111        return None
112
113    if type == socket.SOCK_STREAM:
114        proto = socket.IPPROTO_TCP
115    elif type == socket.SOCK_DGRAM:
116        proto = socket.IPPROTO_UDP
117    else:
118        return None
119
120    if port is None:
121        port = 0
122    elif isinstance(port, bytes) and port == b'':
123        port = 0
124    elif isinstance(port, str) and port == '':
125        port = 0
126    else:
127        # If port's a service name like "http", don't skip getaddrinfo.
128        try:
129            port = int(port)
130        except (TypeError, ValueError):
131            return None
132
133    if family == socket.AF_UNSPEC:
134        afs = [socket.AF_INET]
135        if _HAS_IPv6:
136            afs.append(socket.AF_INET6)
137    else:
138        afs = [family]
139
140    if isinstance(host, bytes):
141        host = host.decode('idna')
142    if '%' in host:
143        # Linux's inet_pton doesn't accept an IPv6 zone index after host,
144        # like '::1%lo0'.
145        return None
146
147    for af in afs:
148        try:
149            socket.inet_pton(af, host)
150            # The host has already been resolved.
151            if _HAS_IPv6 and af == socket.AF_INET6:
152                return af, type, proto, '', (host, port, flowinfo, scopeid)
153            else:
154                return af, type, proto, '', (host, port)
155        except OSError:
156            pass
157
158    # "host" is not an IP address.
159    return None
160
161
162def _interleave_addrinfos(addrinfos, first_address_family_count=1):
163    """Interleave list of addrinfo tuples by family."""
164    # Group addresses by family
165    addrinfos_by_family = collections.OrderedDict()
166    for addr in addrinfos:
167        family = addr[0]
168        if family not in addrinfos_by_family:
169            addrinfos_by_family[family] = []
170        addrinfos_by_family[family].append(addr)
171    addrinfos_lists = list(addrinfos_by_family.values())
172
173    reordered = []
174    if first_address_family_count > 1:
175        reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
176        del addrinfos_lists[0][:first_address_family_count - 1]
177    reordered.extend(
178        a for a in itertools.chain.from_iterable(
179            itertools.zip_longest(*addrinfos_lists)
180        ) if a is not None)
181    return reordered
182
183
184def _run_until_complete_cb(fut):
185    if not fut.cancelled():
186        exc = fut.exception()
187        if isinstance(exc, (SystemExit, KeyboardInterrupt)):
188            # Issue #22429: run_forever() already finished, no need to
189            # stop it.
190            return
191    futures._get_loop(fut).stop()
192
193
194if hasattr(socket, 'TCP_NODELAY'):
195    def _set_nodelay(sock):
196        if (sock.family in {socket.AF_INET, socket.AF_INET6} and
197                sock.type == socket.SOCK_STREAM and
198                sock.proto == socket.IPPROTO_TCP):
199            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
200else:
201    def _set_nodelay(sock):
202        pass
203
204
205class _SendfileFallbackProtocol(protocols.Protocol):
206    def __init__(self, transp):
207        if not isinstance(transp, transports._FlowControlMixin):
208            raise TypeError("transport should be _FlowControlMixin instance")
209        self._transport = transp
210        self._proto = transp.get_protocol()
211        self._should_resume_reading = transp.is_reading()
212        self._should_resume_writing = transp._protocol_paused
213        transp.pause_reading()
214        transp.set_protocol(self)
215        if self._should_resume_writing:
216            self._write_ready_fut = self._transport._loop.create_future()
217        else:
218            self._write_ready_fut = None
219
220    async def drain(self):
221        if self._transport.is_closing():
222            raise ConnectionError("Connection closed by peer")
223        fut = self._write_ready_fut
224        if fut is None:
225            return
226        await fut
227
228    def connection_made(self, transport):
229        raise RuntimeError("Invalid state: "
230                           "connection should have been established already.")
231
232    def connection_lost(self, exc):
233        if self._write_ready_fut is not None:
234            # Never happens if peer disconnects after sending the whole content
235            # Thus disconnection is always an exception from user perspective
236            if exc is None:
237                self._write_ready_fut.set_exception(
238                    ConnectionError("Connection is closed by peer"))
239            else:
240                self._write_ready_fut.set_exception(exc)
241        self._proto.connection_lost(exc)
242
243    def pause_writing(self):
244        if self._write_ready_fut is not None:
245            return
246        self._write_ready_fut = self._transport._loop.create_future()
247
248    def resume_writing(self):
249        if self._write_ready_fut is None:
250            return
251        self._write_ready_fut.set_result(False)
252        self._write_ready_fut = None
253
254    def data_received(self, data):
255        raise RuntimeError("Invalid state: reading should be paused")
256
257    def eof_received(self):
258        raise RuntimeError("Invalid state: reading should be paused")
259
260    async def restore(self):
261        self._transport.set_protocol(self._proto)
262        if self._should_resume_reading:
263            self._transport.resume_reading()
264        if self._write_ready_fut is not None:
265            # Cancel the future.
266            # Basically it has no effect because protocol is switched back,
267            # no code should wait for it anymore.
268            self._write_ready_fut.cancel()
269        if self._should_resume_writing:
270            self._proto.resume_writing()
271
272
273class Server(events.AbstractServer):
274
275    def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
276                 ssl_handshake_timeout):
277        self._loop = loop
278        self._sockets = sockets
279        self._active_count = 0
280        self._waiters = []
281        self._protocol_factory = protocol_factory
282        self._backlog = backlog
283        self._ssl_context = ssl_context
284        self._ssl_handshake_timeout = ssl_handshake_timeout
285        self._serving = False
286        self._serving_forever_fut = None
287
288    def __repr__(self):
289        return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
290
291    def _attach(self):
292        assert self._sockets is not None
293        self._active_count += 1
294
295    def _detach(self):
296        assert self._active_count > 0
297        self._active_count -= 1
298        if self._active_count == 0 and self._sockets is None:
299            self._wakeup()
300
301    def _wakeup(self):
302        waiters = self._waiters
303        self._waiters = None
304        for waiter in waiters:
305            if not waiter.done():
306                waiter.set_result(waiter)
307
308    def _start_serving(self):
309        if self._serving:
310            return
311        self._serving = True
312        for sock in self._sockets:
313            sock.listen(self._backlog)
314            self._loop._start_serving(
315                self._protocol_factory, sock, self._ssl_context,
316                self, self._backlog, self._ssl_handshake_timeout)
317
318    def get_loop(self):
319        return self._loop
320
321    def is_serving(self):
322        return self._serving
323
324    @property
325    def sockets(self):
326        if self._sockets is None:
327            return ()
328        return tuple(trsock.TransportSocket(s) for s in self._sockets)
329
330    def close(self):
331        sockets = self._sockets
332        if sockets is None:
333            return
334        self._sockets = None
335
336        for sock in sockets:
337            self._loop._stop_serving(sock)
338
339        self._serving = False
340
341        if (self._serving_forever_fut is not None and
342                not self._serving_forever_fut.done()):
343            self._serving_forever_fut.cancel()
344            self._serving_forever_fut = None
345
346        if self._active_count == 0:
347            self._wakeup()
348
349    async def start_serving(self):
350        self._start_serving()
351        # Skip one loop iteration so that all 'loop.add_reader'
352        # go through.
353        await tasks.sleep(0)
354
355    async def serve_forever(self):
356        if self._serving_forever_fut is not None:
357            raise RuntimeError(
358                f'server {self!r} is already being awaited on serve_forever()')
359        if self._sockets is None:
360            raise RuntimeError(f'server {self!r} is closed')
361
362        self._start_serving()
363        self._serving_forever_fut = self._loop.create_future()
364
365        try:
366            await self._serving_forever_fut
367        except exceptions.CancelledError:
368            try:
369                self.close()
370                await self.wait_closed()
371            finally:
372                raise
373        finally:
374            self._serving_forever_fut = None
375
376    async def wait_closed(self):
377        if self._sockets is None or self._waiters is None:
378            return
379        waiter = self._loop.create_future()
380        self._waiters.append(waiter)
381        await waiter
382
383
384class BaseEventLoop(events.AbstractEventLoop):
385
386    def __init__(self):
387        self._timer_cancelled_count = 0
388        self._closed = False
389        self._stopping = False
390        self._ready = collections.deque()
391        self._scheduled = []
392        self._default_executor = None
393        self._internal_fds = 0
394        # Identifier of the thread running the event loop, or None if the
395        # event loop is not running
396        self._thread_id = None
397        self._clock_resolution = time.get_clock_info('monotonic').resolution
398        self._exception_handler = None
399        self.set_debug(coroutines._is_debug_mode())
400        # In debug mode, if the execution of a callback or a step of a task
401        # exceed this duration in seconds, the slow callback/task is logged.
402        self.slow_callback_duration = 0.1
403        self._current_handle = None
404        self._task_factory = None
405        self._coroutine_origin_tracking_enabled = False
406        self._coroutine_origin_tracking_saved_depth = None
407
408        # A weak set of all asynchronous generators that are
409        # being iterated by the loop.
410        self._asyncgens = weakref.WeakSet()
411        # Set to True when `loop.shutdown_asyncgens` is called.
412        self._asyncgens_shutdown_called = False
413        # Set to True when `loop.shutdown_default_executor` is called.
414        self._executor_shutdown_called = False
415
416    def __repr__(self):
417        return (
418            f'<{self.__class__.__name__} running={self.is_running()} '
419            f'closed={self.is_closed()} debug={self.get_debug()}>'
420        )
421
422    def create_future(self):
423        """Create a Future object attached to the loop."""
424        return futures.Future(loop=self)
425
426    def create_task(self, coro, *, name=None):
427        """Schedule a coroutine object.
428
429        Return a task object.
430        """
431        self._check_closed()
432        if self._task_factory is None:
433            task = tasks.Task(coro, loop=self, name=name)
434            if task._source_traceback:
435                del task._source_traceback[-1]
436        else:
437            task = self._task_factory(self, coro)
438            tasks._set_task_name(task, name)
439
440        return task
441
442    def set_task_factory(self, factory):
443        """Set a task factory that will be used by loop.create_task().
444
445        If factory is None the default task factory will be set.
446
447        If factory is a callable, it should have a signature matching
448        '(loop, coro)', where 'loop' will be a reference to the active
449        event loop, 'coro' will be a coroutine object.  The callable
450        must return a Future.
451        """
452        if factory is not None and not callable(factory):
453            raise TypeError('task factory must be a callable or None')
454        self._task_factory = factory
455
456    def get_task_factory(self):
457        """Return a task factory, or None if the default one is in use."""
458        return self._task_factory
459
460    def _make_socket_transport(self, sock, protocol, waiter=None, *,
461                               extra=None, server=None):
462        """Create socket transport."""
463        raise NotImplementedError
464
465    def _make_ssl_transport(
466            self, rawsock, protocol, sslcontext, waiter=None,
467            *, server_side=False, server_hostname=None,
468            extra=None, server=None,
469            ssl_handshake_timeout=None,
470            call_connection_made=True):
471        """Create SSL transport."""
472        raise NotImplementedError
473
474    def _make_datagram_transport(self, sock, protocol,
475                                 address=None, waiter=None, extra=None):
476        """Create datagram transport."""
477        raise NotImplementedError
478
479    def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
480                                  extra=None):
481        """Create read pipe transport."""
482        raise NotImplementedError
483
484    def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
485                                   extra=None):
486        """Create write pipe transport."""
487        raise NotImplementedError
488
489    async def _make_subprocess_transport(self, protocol, args, shell,
490                                         stdin, stdout, stderr, bufsize,
491                                         extra=None, **kwargs):
492        """Create subprocess transport."""
493        raise NotImplementedError
494
495    def _write_to_self(self):
496        """Write a byte to self-pipe, to wake up the event loop.
497
498        This may be called from a different thread.
499
500        The subclass is responsible for implementing the self-pipe.
501        """
502        raise NotImplementedError
503
504    def _process_events(self, event_list):
505        """Process selector events."""
506        raise NotImplementedError
507
508    def _check_closed(self):
509        if self._closed:
510            raise RuntimeError('Event loop is closed')
511
512    def _check_default_executor(self):
513        if self._executor_shutdown_called:
514            raise RuntimeError('Executor shutdown has been called')
515
516    def _asyncgen_finalizer_hook(self, agen):
517        self._asyncgens.discard(agen)
518        if not self.is_closed():
519            self.call_soon_threadsafe(self.create_task, agen.aclose())
520
521    def _asyncgen_firstiter_hook(self, agen):
522        if self._asyncgens_shutdown_called:
523            warnings.warn(
524                f"asynchronous generator {agen!r} was scheduled after "
525                f"loop.shutdown_asyncgens() call",
526                ResourceWarning, source=self)
527
528        self._asyncgens.add(agen)
529
530    async def shutdown_asyncgens(self):
531        """Shutdown all active asynchronous generators."""
532        self._asyncgens_shutdown_called = True
533
534        if not len(self._asyncgens):
535            # If Python version is <3.6 or we don't have any asynchronous
536            # generators alive.
537            return
538
539        closing_agens = list(self._asyncgens)
540        self._asyncgens.clear()
541
542        results = await tasks.gather(
543            *[ag.aclose() for ag in closing_agens],
544            return_exceptions=True)
545
546        for result, agen in zip(results, closing_agens):
547            if isinstance(result, Exception):
548                self.call_exception_handler({
549                    'message': f'an error occurred during closing of '
550                               f'asynchronous generator {agen!r}',
551                    'exception': result,
552                    'asyncgen': agen
553                })
554
555    async def shutdown_default_executor(self):
556        """Schedule the shutdown of the default executor."""
557        self._executor_shutdown_called = True
558        if self._default_executor is None:
559            return
560        future = self.create_future()
561        thread = threading.Thread(target=self._do_shutdown, args=(future,))
562        thread.start()
563        try:
564            await future
565        finally:
566            thread.join()
567
568    def _do_shutdown(self, future):
569        try:
570            self._default_executor.shutdown(wait=True)
571            self.call_soon_threadsafe(future.set_result, None)
572        except Exception as ex:
573            self.call_soon_threadsafe(future.set_exception, ex)
574
575    def _check_running(self):
576        if self.is_running():
577            raise RuntimeError('This event loop is already running')
578        if events._get_running_loop() is not None:
579            raise RuntimeError(
580                'Cannot run the event loop while another loop is running')
581
582    def run_forever(self):
583        """Run until stop() is called."""
584        self._check_closed()
585        self._check_running()
586        self._set_coroutine_origin_tracking(self._debug)
587        self._thread_id = threading.get_ident()
588
589        old_agen_hooks = sys.get_asyncgen_hooks()
590        sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
591                               finalizer=self._asyncgen_finalizer_hook)
592        try:
593            events._set_running_loop(self)
594            while True:
595                self._run_once()
596                if self._stopping:
597                    break
598        finally:
599            self._stopping = False
600            self._thread_id = None
601            events._set_running_loop(None)
602            self._set_coroutine_origin_tracking(False)
603            sys.set_asyncgen_hooks(*old_agen_hooks)
604
605    def run_until_complete(self, future):
606        """Run until the Future is done.
607
608        If the argument is a coroutine, it is wrapped in a Task.
609
610        WARNING: It would be disastrous to call run_until_complete()
611        with the same coroutine twice -- it would wrap it in two
612        different Tasks and that can't be good.
613
614        Return the Future's result, or raise its exception.
615        """
616        self._check_closed()
617        self._check_running()
618
619        new_task = not futures.isfuture(future)
620        future = tasks.ensure_future(future, loop=self)
621        if new_task:
622            # An exception is raised if the future didn't complete, so there
623            # is no need to log the "destroy pending task" message
624            future._log_destroy_pending = False
625
626        future.add_done_callback(_run_until_complete_cb)
627        try:
628            self.run_forever()
629        except:
630            if new_task and future.done() and not future.cancelled():
631                # The coroutine raised a BaseException. Consume the exception
632                # to not log a warning, the caller doesn't have access to the
633                # local task.
634                future.exception()
635            raise
636        finally:
637            future.remove_done_callback(_run_until_complete_cb)
638        if not future.done():
639            raise RuntimeError('Event loop stopped before Future completed.')
640
641        return future.result()
642
643    def stop(self):
644        """Stop running the event loop.
645
646        Every callback already scheduled will still run.  This simply informs
647        run_forever to stop looping after a complete iteration.
648        """
649        self._stopping = True
650
651    def close(self):
652        """Close the event loop.
653
654        This clears the queues and shuts down the executor,
655        but does not wait for the executor to finish.
656
657        The event loop must not be running.
658        """
659        if self.is_running():
660            raise RuntimeError("Cannot close a running event loop")
661        if self._closed:
662            return
663        if self._debug:
664            logger.debug("Close %r", self)
665        self._closed = True
666        self._ready.clear()
667        self._scheduled.clear()
668        self._executor_shutdown_called = True
669        executor = self._default_executor
670        if executor is not None:
671            self._default_executor = None
672            executor.shutdown(wait=False)
673
674    def is_closed(self):
675        """Returns True if the event loop was closed."""
676        return self._closed
677
678    def __del__(self, _warn=warnings.warn):
679        if not self.is_closed():
680            _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
681            if not self.is_running():
682                self.close()
683
684    def is_running(self):
685        """Returns True if the event loop is running."""
686        return (self._thread_id is not None)
687
688    def time(self):
689        """Return the time according to the event loop's clock.
690
691        This is a float expressed in seconds since an epoch, but the
692        epoch, precision, accuracy and drift are unspecified and may
693        differ per event loop.
694        """
695        return time.monotonic()
696
697    def call_later(self, delay, callback, *args, context=None):
698        """Arrange for a callback to be called at a given time.
699
700        Return a Handle: an opaque object with a cancel() method that
701        can be used to cancel the call.
702
703        The delay can be an int or float, expressed in seconds.  It is
704        always relative to the current time.
705
706        Each callback will be called exactly once.  If two callbacks
707        are scheduled for exactly the same time, it undefined which
708        will be called first.
709
710        Any positional arguments after the callback will be passed to
711        the callback when it is called.
712        """
713        timer = self.call_at(self.time() + delay, callback, *args,
714                             context=context)
715        if timer._source_traceback:
716            del timer._source_traceback[-1]
717        return timer
718
719    def call_at(self, when, callback, *args, context=None):
720        """Like call_later(), but uses an absolute time.
721
722        Absolute time corresponds to the event loop's time() method.
723        """
724        self._check_closed()
725        if self._debug:
726            self._check_thread()
727            self._check_callback(callback, 'call_at')
728        timer = events.TimerHandle(when, callback, args, self, context)
729        if timer._source_traceback:
730            del timer._source_traceback[-1]
731        heapq.heappush(self._scheduled, timer)
732        timer._scheduled = True
733        return timer
734
735    def call_soon(self, callback, *args, context=None):
736        """Arrange for a callback to be called as soon as possible.
737
738        This operates as a FIFO queue: callbacks are called in the
739        order in which they are registered.  Each callback will be
740        called exactly once.
741
742        Any positional arguments after the callback will be passed to
743        the callback when it is called.
744        """
745        self._check_closed()
746        if self._debug:
747            self._check_thread()
748            self._check_callback(callback, 'call_soon')
749        handle = self._call_soon(callback, args, context)
750        if handle._source_traceback:
751            del handle._source_traceback[-1]
752        return handle
753
754    def _check_callback(self, callback, method):
755        if (coroutines.iscoroutine(callback) or
756                coroutines.iscoroutinefunction(callback)):
757            raise TypeError(
758                f"coroutines cannot be used with {method}()")
759        if not callable(callback):
760            raise TypeError(
761                f'a callable object was expected by {method}(), '
762                f'got {callback!r}')
763
764    def _call_soon(self, callback, args, context):
765        handle = events.Handle(callback, args, self, context)
766        if handle._source_traceback:
767            del handle._source_traceback[-1]
768        self._ready.append(handle)
769        return handle
770
771    def _check_thread(self):
772        """Check that the current thread is the thread running the event loop.
773
774        Non-thread-safe methods of this class make this assumption and will
775        likely behave incorrectly when the assumption is violated.
776
777        Should only be called when (self._debug == True).  The caller is
778        responsible for checking this condition for performance reasons.
779        """
780        if self._thread_id is None:
781            return
782        thread_id = threading.get_ident()
783        if thread_id != self._thread_id:
784            raise RuntimeError(
785                "Non-thread-safe operation invoked on an event loop other "
786                "than the current one")
787
788    def call_soon_threadsafe(self, callback, *args, context=None):
789        """Like call_soon(), but thread-safe."""
790        self._check_closed()
791        if self._debug:
792            self._check_callback(callback, 'call_soon_threadsafe')
793        handle = self._call_soon(callback, args, context)
794        if handle._source_traceback:
795            del handle._source_traceback[-1]
796        self._write_to_self()
797        return handle
798
799    def run_in_executor(self, executor, func, *args):
800        self._check_closed()
801        if self._debug:
802            self._check_callback(func, 'run_in_executor')
803        if executor is None:
804            executor = self._default_executor
805            # Only check when the default executor is being used
806            self._check_default_executor()
807            if executor is None:
808                executor = concurrent.futures.ThreadPoolExecutor(
809                    thread_name_prefix='asyncio'
810                )
811                self._default_executor = executor
812        return futures.wrap_future(
813            executor.submit(func, *args), loop=self)
814
815    def set_default_executor(self, executor):
816        if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
817            warnings.warn(
818                'Using the default executor that is not an instance of '
819                'ThreadPoolExecutor is deprecated and will be prohibited '
820                'in Python 3.9',
821                DeprecationWarning, 2)
822        self._default_executor = executor
823
824    def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
825        msg = [f"{host}:{port!r}"]
826        if family:
827            msg.append(f'family={family!r}')
828        if type:
829            msg.append(f'type={type!r}')
830        if proto:
831            msg.append(f'proto={proto!r}')
832        if flags:
833            msg.append(f'flags={flags!r}')
834        msg = ', '.join(msg)
835        logger.debug('Get address info %s', msg)
836
837        t0 = self.time()
838        addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
839        dt = self.time() - t0
840
841        msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
842        if dt >= self.slow_callback_duration:
843            logger.info(msg)
844        else:
845            logger.debug(msg)
846        return addrinfo
847
848    async def getaddrinfo(self, host, port, *,
849                          family=0, type=0, proto=0, flags=0):
850        if self._debug:
851            getaddr_func = self._getaddrinfo_debug
852        else:
853            getaddr_func = socket.getaddrinfo
854
855        return await self.run_in_executor(
856            None, getaddr_func, host, port, family, type, proto, flags)
857
858    async def getnameinfo(self, sockaddr, flags=0):
859        return await self.run_in_executor(
860            None, socket.getnameinfo, sockaddr, flags)
861
862    async def sock_sendfile(self, sock, file, offset=0, count=None,
863                            *, fallback=True):
864        if self._debug and sock.gettimeout() != 0:
865            raise ValueError("the socket must be non-blocking")
866        self._check_sendfile_params(sock, file, offset, count)
867        try:
868            return await self._sock_sendfile_native(sock, file,
869                                                    offset, count)
870        except exceptions.SendfileNotAvailableError as exc:
871            if not fallback:
872                raise
873        return await self._sock_sendfile_fallback(sock, file,
874                                                  offset, count)
875
876    async def _sock_sendfile_native(self, sock, file, offset, count):
877        # NB: sendfile syscall is not supported for SSL sockets and
878        # non-mmap files even if sendfile is supported by OS
879        raise exceptions.SendfileNotAvailableError(
880            f"syscall sendfile is not available for socket {sock!r} "
881            "and file {file!r} combination")
882
883    async def _sock_sendfile_fallback(self, sock, file, offset, count):
884        if offset:
885            file.seek(offset)
886        blocksize = (
887            min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
888            if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
889        )
890        buf = bytearray(blocksize)
891        total_sent = 0
892        try:
893            while True:
894                if count:
895                    blocksize = min(count - total_sent, blocksize)
896                    if blocksize <= 0:
897                        break
898                view = memoryview(buf)[:blocksize]
899                read = await self.run_in_executor(None, file.readinto, view)
900                if not read:
901                    break  # EOF
902                await self.sock_sendall(sock, view[:read])
903                total_sent += read
904            return total_sent
905        finally:
906            if total_sent > 0 and hasattr(file, 'seek'):
907                file.seek(offset + total_sent)
908
909    def _check_sendfile_params(self, sock, file, offset, count):
910        if 'b' not in getattr(file, 'mode', 'b'):
911            raise ValueError("file should be opened in binary mode")
912        if not sock.type == socket.SOCK_STREAM:
913            raise ValueError("only SOCK_STREAM type sockets are supported")
914        if count is not None:
915            if not isinstance(count, int):
916                raise TypeError(
917                    "count must be a positive integer (got {!r})".format(count))
918            if count <= 0:
919                raise ValueError(
920                    "count must be a positive integer (got {!r})".format(count))
921        if not isinstance(offset, int):
922            raise TypeError(
923                "offset must be a non-negative integer (got {!r})".format(
924                    offset))
925        if offset < 0:
926            raise ValueError(
927                "offset must be a non-negative integer (got {!r})".format(
928                    offset))
929
930    async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
931        """Create, bind and connect one socket."""
932        my_exceptions = []
933        exceptions.append(my_exceptions)
934        family, type_, proto, _, address = addr_info
935        sock = None
936        try:
937            sock = socket.socket(family=family, type=type_, proto=proto)
938            sock.setblocking(False)
939            if local_addr_infos is not None:
940                for _, _, _, _, laddr in local_addr_infos:
941                    try:
942                        sock.bind(laddr)
943                        break
944                    except OSError as exc:
945                        msg = (
946                            f'error while attempting to bind on '
947                            f'address {laddr!r}: '
948                            f'{exc.strerror.lower()}'
949                        )
950                        exc = OSError(exc.errno, msg)
951                        my_exceptions.append(exc)
952                else:  # all bind attempts failed
953                    raise my_exceptions.pop()
954            await self.sock_connect(sock, address)
955            return sock
956        except OSError as exc:
957            my_exceptions.append(exc)
958            if sock is not None:
959                sock.close()
960            raise
961        except:
962            if sock is not None:
963                sock.close()
964            raise
965
966    async def create_connection(
967            self, protocol_factory, host=None, port=None,
968            *, ssl=None, family=0,
969            proto=0, flags=0, sock=None,
970            local_addr=None, server_hostname=None,
971            ssl_handshake_timeout=None,
972            happy_eyeballs_delay=None, interleave=None):
973        """Connect to a TCP server.
974
975        Create a streaming transport connection to a given internet host and
976        port: socket family AF_INET or socket.AF_INET6 depending on host (or
977        family if specified), socket type SOCK_STREAM. protocol_factory must be
978        a callable returning a protocol instance.
979
980        This method is a coroutine which will try to establish the connection
981        in the background.  When successful, the coroutine returns a
982        (transport, protocol) pair.
983        """
984        if server_hostname is not None and not ssl:
985            raise ValueError('server_hostname is only meaningful with ssl')
986
987        if server_hostname is None and ssl:
988            # Use host as default for server_hostname.  It is an error
989            # if host is empty or not set, e.g. when an
990            # already-connected socket was passed or when only a port
991            # is given.  To avoid this error, you can pass
992            # server_hostname='' -- this will bypass the hostname
993            # check.  (This also means that if host is a numeric
994            # IP/IPv6 address, we will attempt to verify that exact
995            # address; this will probably fail, but it is possible to
996            # create a certificate for a specific IP address, so we
997            # don't judge it here.)
998            if not host:
999                raise ValueError('You must set server_hostname '
1000                                 'when using ssl without a host')
1001            server_hostname = host
1002
1003        if ssl_handshake_timeout is not None and not ssl:
1004            raise ValueError(
1005                'ssl_handshake_timeout is only meaningful with ssl')
1006
1007        if happy_eyeballs_delay is not None and interleave is None:
1008            # If using happy eyeballs, default to interleave addresses by family
1009            interleave = 1
1010
1011        if host is not None or port is not None:
1012            if sock is not None:
1013                raise ValueError(
1014                    'host/port and sock can not be specified at the same time')
1015
1016            infos = await self._ensure_resolved(
1017                (host, port), family=family,
1018                type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
1019            if not infos:
1020                raise OSError('getaddrinfo() returned empty list')
1021
1022            if local_addr is not None:
1023                laddr_infos = await self._ensure_resolved(
1024                    local_addr, family=family,
1025                    type=socket.SOCK_STREAM, proto=proto,
1026                    flags=flags, loop=self)
1027                if not laddr_infos:
1028                    raise OSError('getaddrinfo() returned empty list')
1029            else:
1030                laddr_infos = None
1031
1032            if interleave:
1033                infos = _interleave_addrinfos(infos, interleave)
1034
1035            exceptions = []
1036            if happy_eyeballs_delay is None:
1037                # not using happy eyeballs
1038                for addrinfo in infos:
1039                    try:
1040                        sock = await self._connect_sock(
1041                            exceptions, addrinfo, laddr_infos)
1042                        break
1043                    except OSError:
1044                        continue
1045            else:  # using happy eyeballs
1046                sock, _, _ = await staggered.staggered_race(
1047                    (functools.partial(self._connect_sock,
1048                                       exceptions, addrinfo, laddr_infos)
1049                     for addrinfo in infos),
1050                    happy_eyeballs_delay, loop=self)
1051
1052            if sock is None:
1053                exceptions = [exc for sub in exceptions for exc in sub]
1054                if len(exceptions) == 1:
1055                    raise exceptions[0]
1056                else:
1057                    # If they all have the same str(), raise one.
1058                    model = str(exceptions[0])
1059                    if all(str(exc) == model for exc in exceptions):
1060                        raise exceptions[0]
1061                    # Raise a combined exception so the user can see all
1062                    # the various error messages.
1063                    raise OSError('Multiple exceptions: {}'.format(
1064                        ', '.join(str(exc) for exc in exceptions)))
1065
1066        else:
1067            if sock is None:
1068                raise ValueError(
1069                    'host and port was not specified and no sock specified')
1070            if sock.type != socket.SOCK_STREAM:
1071                # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1072                # are SOCK_STREAM.
1073                # We support passing AF_UNIX sockets even though we have
1074                # a dedicated API for that: create_unix_connection.
1075                # Disallowing AF_UNIX in this method, breaks backwards
1076                # compatibility.
1077                raise ValueError(
1078                    f'A Stream Socket was expected, got {sock!r}')
1079
1080        transport, protocol = await self._create_connection_transport(
1081            sock, protocol_factory, ssl, server_hostname,
1082            ssl_handshake_timeout=ssl_handshake_timeout)
1083        if self._debug:
1084            # Get the socket from the transport because SSL transport closes
1085            # the old socket and creates a new SSL socket
1086            sock = transport.get_extra_info('socket')
1087            logger.debug("%r connected to %s:%r: (%r, %r)",
1088                         sock, host, port, transport, protocol)
1089        return transport, protocol
1090
1091    async def _create_connection_transport(
1092            self, sock, protocol_factory, ssl,
1093            server_hostname, server_side=False,
1094            ssl_handshake_timeout=None):
1095
1096        sock.setblocking(False)
1097
1098        protocol = protocol_factory()
1099        waiter = self.create_future()
1100        if ssl:
1101            sslcontext = None if isinstance(ssl, bool) else ssl
1102            transport = self._make_ssl_transport(
1103                sock, protocol, sslcontext, waiter,
1104                server_side=server_side, server_hostname=server_hostname,
1105                ssl_handshake_timeout=ssl_handshake_timeout)
1106        else:
1107            transport = self._make_socket_transport(sock, protocol, waiter)
1108
1109        try:
1110            await waiter
1111        except:
1112            transport.close()
1113            raise
1114
1115        return transport, protocol
1116
1117    async def sendfile(self, transport, file, offset=0, count=None,
1118                       *, fallback=True):
1119        """Send a file to transport.
1120
1121        Return the total number of bytes which were sent.
1122
1123        The method uses high-performance os.sendfile if available.
1124
1125        file must be a regular file object opened in binary mode.
1126
1127        offset tells from where to start reading the file. If specified,
1128        count is the total number of bytes to transmit as opposed to
1129        sending the file until EOF is reached. File position is updated on
1130        return or also in case of error in which case file.tell()
1131        can be used to figure out the number of bytes
1132        which were sent.
1133
1134        fallback set to True makes asyncio to manually read and send
1135        the file when the platform does not support the sendfile syscall
1136        (e.g. Windows or SSL socket on Unix).
1137
1138        Raise SendfileNotAvailableError if the system does not support
1139        sendfile syscall and fallback is False.
1140        """
1141        if transport.is_closing():
1142            raise RuntimeError("Transport is closing")
1143        mode = getattr(transport, '_sendfile_compatible',
1144                       constants._SendfileMode.UNSUPPORTED)
1145        if mode is constants._SendfileMode.UNSUPPORTED:
1146            raise RuntimeError(
1147                f"sendfile is not supported for transport {transport!r}")
1148        if mode is constants._SendfileMode.TRY_NATIVE:
1149            try:
1150                return await self._sendfile_native(transport, file,
1151                                                   offset, count)
1152            except exceptions.SendfileNotAvailableError as exc:
1153                if not fallback:
1154                    raise
1155
1156        if not fallback:
1157            raise RuntimeError(
1158                f"fallback is disabled and native sendfile is not "
1159                f"supported for transport {transport!r}")
1160
1161        return await self._sendfile_fallback(transport, file,
1162                                             offset, count)
1163
1164    async def _sendfile_native(self, transp, file, offset, count):
1165        raise exceptions.SendfileNotAvailableError(
1166            "sendfile syscall is not supported")
1167
1168    async def _sendfile_fallback(self, transp, file, offset, count):
1169        if offset:
1170            file.seek(offset)
1171        blocksize = min(count, 16384) if count else 16384
1172        buf = bytearray(blocksize)
1173        total_sent = 0
1174        proto = _SendfileFallbackProtocol(transp)
1175        try:
1176            while True:
1177                if count:
1178                    blocksize = min(count - total_sent, blocksize)
1179                    if blocksize <= 0:
1180                        return total_sent
1181                view = memoryview(buf)[:blocksize]
1182                read = await self.run_in_executor(None, file.readinto, view)
1183                if not read:
1184                    return total_sent  # EOF
1185                await proto.drain()
1186                transp.write(view[:read])
1187                total_sent += read
1188        finally:
1189            if total_sent > 0 and hasattr(file, 'seek'):
1190                file.seek(offset + total_sent)
1191            await proto.restore()
1192
1193    async def start_tls(self, transport, protocol, sslcontext, *,
1194                        server_side=False,
1195                        server_hostname=None,
1196                        ssl_handshake_timeout=None):
1197        """Upgrade transport to TLS.
1198
1199        Return a new transport that *protocol* should start using
1200        immediately.
1201        """
1202        if ssl is None:
1203            raise RuntimeError('Python ssl module is not available')
1204
1205        if not isinstance(sslcontext, ssl.SSLContext):
1206            raise TypeError(
1207                f'sslcontext is expected to be an instance of ssl.SSLContext, '
1208                f'got {sslcontext!r}')
1209
1210        if not getattr(transport, '_start_tls_compatible', False):
1211            raise TypeError(
1212                f'transport {transport!r} is not supported by start_tls()')
1213
1214        waiter = self.create_future()
1215        ssl_protocol = sslproto.SSLProtocol(
1216            self, protocol, sslcontext, waiter,
1217            server_side, server_hostname,
1218            ssl_handshake_timeout=ssl_handshake_timeout,
1219            call_connection_made=False)
1220
1221        # Pause early so that "ssl_protocol.data_received()" doesn't
1222        # have a chance to get called before "ssl_protocol.connection_made()".
1223        transport.pause_reading()
1224
1225        transport.set_protocol(ssl_protocol)
1226        conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1227        resume_cb = self.call_soon(transport.resume_reading)
1228
1229        try:
1230            await waiter
1231        except BaseException:
1232            transport.close()
1233            conmade_cb.cancel()
1234            resume_cb.cancel()
1235            raise
1236
1237        return ssl_protocol._app_transport
1238
1239    async def create_datagram_endpoint(self, protocol_factory,
1240                                       local_addr=None, remote_addr=None, *,
1241                                       family=0, proto=0, flags=0,
1242                                       reuse_address=_unset, reuse_port=None,
1243                                       allow_broadcast=None, sock=None):
1244        """Create datagram connection."""
1245        if sock is not None:
1246            if sock.type != socket.SOCK_DGRAM:
1247                raise ValueError(
1248                    f'A UDP Socket was expected, got {sock!r}')
1249            if (local_addr or remote_addr or
1250                    family or proto or flags or
1251                    reuse_port or allow_broadcast):
1252                # show the problematic kwargs in exception msg
1253                opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1254                            family=family, proto=proto, flags=flags,
1255                            reuse_address=reuse_address, reuse_port=reuse_port,
1256                            allow_broadcast=allow_broadcast)
1257                problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
1258                raise ValueError(
1259                    f'socket modifier keyword arguments can not be used '
1260                    f'when sock is specified. ({problems})')
1261            sock.setblocking(False)
1262            r_addr = None
1263        else:
1264            if not (local_addr or remote_addr):
1265                if family == 0:
1266                    raise ValueError('unexpected address family')
1267                addr_pairs_info = (((family, proto), (None, None)),)
1268            elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1269                for addr in (local_addr, remote_addr):
1270                    if addr is not None and not isinstance(addr, str):
1271                        raise TypeError('string is expected')
1272
1273                if local_addr and local_addr[0] not in (0, '\x00'):
1274                    try:
1275                        if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1276                            os.remove(local_addr)
1277                    except FileNotFoundError:
1278                        pass
1279                    except OSError as err:
1280                        # Directory may have permissions only to create socket.
1281                        logger.error('Unable to check or remove stale UNIX '
1282                                     'socket %r: %r',
1283                                     local_addr, err)
1284
1285                addr_pairs_info = (((family, proto),
1286                                    (local_addr, remote_addr)), )
1287            else:
1288                # join address by (family, protocol)
1289                addr_infos = {}  # Using order preserving dict
1290                for idx, addr in ((0, local_addr), (1, remote_addr)):
1291                    if addr is not None:
1292                        assert isinstance(addr, tuple) and len(addr) == 2, (
1293                            '2-tuple is expected')
1294
1295                        infos = await self._ensure_resolved(
1296                            addr, family=family, type=socket.SOCK_DGRAM,
1297                            proto=proto, flags=flags, loop=self)
1298                        if not infos:
1299                            raise OSError('getaddrinfo() returned empty list')
1300
1301                        for fam, _, pro, _, address in infos:
1302                            key = (fam, pro)
1303                            if key not in addr_infos:
1304                                addr_infos[key] = [None, None]
1305                            addr_infos[key][idx] = address
1306
1307                # each addr has to have info for each (family, proto) pair
1308                addr_pairs_info = [
1309                    (key, addr_pair) for key, addr_pair in addr_infos.items()
1310                    if not ((local_addr and addr_pair[0] is None) or
1311                            (remote_addr and addr_pair[1] is None))]
1312
1313                if not addr_pairs_info:
1314                    raise ValueError('can not get address information')
1315
1316            exceptions = []
1317
1318            # bpo-37228
1319            if reuse_address is not _unset:
1320                if reuse_address:
1321                    raise ValueError("Passing `reuse_address=True` is no "
1322                                     "longer supported, as the usage of "
1323                                     "SO_REUSEPORT in UDP poses a significant "
1324                                     "security concern.")
1325                else:
1326                    warnings.warn("The *reuse_address* parameter has been "
1327                                  "deprecated as of 3.5.10 and is scheduled "
1328                                  "for removal in 3.11.", DeprecationWarning,
1329                                  stacklevel=2)
1330
1331            for ((family, proto),
1332                 (local_address, remote_address)) in addr_pairs_info:
1333                sock = None
1334                r_addr = None
1335                try:
1336                    sock = socket.socket(
1337                        family=family, type=socket.SOCK_DGRAM, proto=proto)
1338                    if reuse_port:
1339                        _set_reuseport(sock)
1340                    if allow_broadcast:
1341                        sock.setsockopt(
1342                            socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1343                    sock.setblocking(False)
1344
1345                    if local_addr:
1346                        sock.bind(local_address)
1347                    if remote_addr:
1348                        if not allow_broadcast:
1349                            await self.sock_connect(sock, remote_address)
1350                        r_addr = remote_address
1351                except OSError as exc:
1352                    if sock is not None:
1353                        sock.close()
1354                    exceptions.append(exc)
1355                except:
1356                    if sock is not None:
1357                        sock.close()
1358                    raise
1359                else:
1360                    break
1361            else:
1362                raise exceptions[0]
1363
1364        protocol = protocol_factory()
1365        waiter = self.create_future()
1366        transport = self._make_datagram_transport(
1367            sock, protocol, r_addr, waiter)
1368        if self._debug:
1369            if local_addr:
1370                logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1371                            "created: (%r, %r)",
1372                            local_addr, remote_addr, transport, protocol)
1373            else:
1374                logger.debug("Datagram endpoint remote_addr=%r created: "
1375                             "(%r, %r)",
1376                             remote_addr, transport, protocol)
1377
1378        try:
1379            await waiter
1380        except:
1381            transport.close()
1382            raise
1383
1384        return transport, protocol
1385
1386    async def _ensure_resolved(self, address, *,
1387                               family=0, type=socket.SOCK_STREAM,
1388                               proto=0, flags=0, loop):
1389        host, port = address[:2]
1390        info = _ipaddr_info(host, port, family, type, proto, *address[2:])
1391        if info is not None:
1392            # "host" is already a resolved IP.
1393            return [info]
1394        else:
1395            return await loop.getaddrinfo(host, port, family=family, type=type,
1396                                          proto=proto, flags=flags)
1397
1398    async def _create_server_getaddrinfo(self, host, port, family, flags):
1399        infos = await self._ensure_resolved((host, port), family=family,
1400                                            type=socket.SOCK_STREAM,
1401                                            flags=flags, loop=self)
1402        if not infos:
1403            raise OSError(f'getaddrinfo({host!r}) returned empty list')
1404        return infos
1405
1406    async def create_server(
1407            self, protocol_factory, host=None, port=None,
1408            *,
1409            family=socket.AF_UNSPEC,
1410            flags=socket.AI_PASSIVE,
1411            sock=None,
1412            backlog=100,
1413            ssl=None,
1414            reuse_address=None,
1415            reuse_port=None,
1416            ssl_handshake_timeout=None,
1417            start_serving=True):
1418        """Create a TCP server.
1419
1420        The host parameter can be a string, in that case the TCP server is
1421        bound to host and port.
1422
1423        The host parameter can also be a sequence of strings and in that case
1424        the TCP server is bound to all hosts of the sequence. If a host
1425        appears multiple times (possibly indirectly e.g. when hostnames
1426        resolve to the same IP address), the server is only bound once to that
1427        host.
1428
1429        Return a Server object which can be used to stop the service.
1430
1431        This method is a coroutine.
1432        """
1433        if isinstance(ssl, bool):
1434            raise TypeError('ssl argument must be an SSLContext or None')
1435
1436        if ssl_handshake_timeout is not None and ssl is None:
1437            raise ValueError(
1438                'ssl_handshake_timeout is only meaningful with ssl')
1439
1440        if host is not None or port is not None:
1441            if sock is not None:
1442                raise ValueError(
1443                    'host/port and sock can not be specified at the same time')
1444
1445            if reuse_address is None:
1446                reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1447            sockets = []
1448            if host == '':
1449                hosts = [None]
1450            elif (isinstance(host, str) or
1451                  not isinstance(host, collections.abc.Iterable)):
1452                hosts = [host]
1453            else:
1454                hosts = host
1455
1456            fs = [self._create_server_getaddrinfo(host, port, family=family,
1457                                                  flags=flags)
1458                  for host in hosts]
1459            infos = await tasks.gather(*fs)
1460            infos = set(itertools.chain.from_iterable(infos))
1461
1462            completed = False
1463            try:
1464                for res in infos:
1465                    af, socktype, proto, canonname, sa = res
1466                    try:
1467                        sock = socket.socket(af, socktype, proto)
1468                    except socket.error:
1469                        # Assume it's a bad family/type/protocol combination.
1470                        if self._debug:
1471                            logger.warning('create_server() failed to create '
1472                                           'socket.socket(%r, %r, %r)',
1473                                           af, socktype, proto, exc_info=True)
1474                        continue
1475                    sockets.append(sock)
1476                    if reuse_address:
1477                        sock.setsockopt(
1478                            socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1479                    if reuse_port:
1480                        _set_reuseport(sock)
1481                    # Disable IPv4/IPv6 dual stack support (enabled by
1482                    # default on Linux) which makes a single socket
1483                    # listen on both address families.
1484                    if (_HAS_IPv6 and
1485                            af == socket.AF_INET6 and
1486                            hasattr(socket, 'IPPROTO_IPV6')):
1487                        sock.setsockopt(socket.IPPROTO_IPV6,
1488                                        socket.IPV6_V6ONLY,
1489                                        True)
1490                    try:
1491                        sock.bind(sa)
1492                    except OSError as err:
1493                        raise OSError(err.errno, 'error while attempting '
1494                                      'to bind on address %r: %s'
1495                                      % (sa, err.strerror.lower())) from None
1496                completed = True
1497            finally:
1498                if not completed:
1499                    for sock in sockets:
1500                        sock.close()
1501        else:
1502            if sock is None:
1503                raise ValueError('Neither host/port nor sock were specified')
1504            if sock.type != socket.SOCK_STREAM:
1505                raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1506            sockets = [sock]
1507
1508        for sock in sockets:
1509            sock.setblocking(False)
1510
1511        server = Server(self, sockets, protocol_factory,
1512                        ssl, backlog, ssl_handshake_timeout)
1513        if start_serving:
1514            server._start_serving()
1515            # Skip one loop iteration so that all 'loop.add_reader'
1516            # go through.
1517            await tasks.sleep(0)
1518
1519        if self._debug:
1520            logger.info("%r is serving", server)
1521        return server
1522
1523    async def connect_accepted_socket(
1524            self, protocol_factory, sock,
1525            *, ssl=None,
1526            ssl_handshake_timeout=None):
1527        if sock.type != socket.SOCK_STREAM:
1528            raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1529
1530        if ssl_handshake_timeout is not None and not ssl:
1531            raise ValueError(
1532                'ssl_handshake_timeout is only meaningful with ssl')
1533
1534        transport, protocol = await self._create_connection_transport(
1535            sock, protocol_factory, ssl, '', server_side=True,
1536            ssl_handshake_timeout=ssl_handshake_timeout)
1537        if self._debug:
1538            # Get the socket from the transport because SSL transport closes
1539            # the old socket and creates a new SSL socket
1540            sock = transport.get_extra_info('socket')
1541            logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1542        return transport, protocol
1543
1544    async def connect_read_pipe(self, protocol_factory, pipe):
1545        protocol = protocol_factory()
1546        waiter = self.create_future()
1547        transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1548
1549        try:
1550            await waiter
1551        except:
1552            transport.close()
1553            raise
1554
1555        if self._debug:
1556            logger.debug('Read pipe %r connected: (%r, %r)',
1557                         pipe.fileno(), transport, protocol)
1558        return transport, protocol
1559
1560    async def connect_write_pipe(self, protocol_factory, pipe):
1561        protocol = protocol_factory()
1562        waiter = self.create_future()
1563        transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1564
1565        try:
1566            await waiter
1567        except:
1568            transport.close()
1569            raise
1570
1571        if self._debug:
1572            logger.debug('Write pipe %r connected: (%r, %r)',
1573                         pipe.fileno(), transport, protocol)
1574        return transport, protocol
1575
1576    def _log_subprocess(self, msg, stdin, stdout, stderr):
1577        info = [msg]
1578        if stdin is not None:
1579            info.append(f'stdin={_format_pipe(stdin)}')
1580        if stdout is not None and stderr == subprocess.STDOUT:
1581            info.append(f'stdout=stderr={_format_pipe(stdout)}')
1582        else:
1583            if stdout is not None:
1584                info.append(f'stdout={_format_pipe(stdout)}')
1585            if stderr is not None:
1586                info.append(f'stderr={_format_pipe(stderr)}')
1587        logger.debug(' '.join(info))
1588
1589    async def subprocess_shell(self, protocol_factory, cmd, *,
1590                               stdin=subprocess.PIPE,
1591                               stdout=subprocess.PIPE,
1592                               stderr=subprocess.PIPE,
1593                               universal_newlines=False,
1594                               shell=True, bufsize=0,
1595                               encoding=None, errors=None, text=None,
1596                               **kwargs):
1597        if not isinstance(cmd, (bytes, str)):
1598            raise ValueError("cmd must be a string")
1599        if universal_newlines:
1600            raise ValueError("universal_newlines must be False")
1601        if not shell:
1602            raise ValueError("shell must be True")
1603        if bufsize != 0:
1604            raise ValueError("bufsize must be 0")
1605        if text:
1606            raise ValueError("text must be False")
1607        if encoding is not None:
1608            raise ValueError("encoding must be None")
1609        if errors is not None:
1610            raise ValueError("errors must be None")
1611
1612        protocol = protocol_factory()
1613        debug_log = None
1614        if self._debug:
1615            # don't log parameters: they may contain sensitive information
1616            # (password) and may be too long
1617            debug_log = 'run shell command %r' % cmd
1618            self._log_subprocess(debug_log, stdin, stdout, stderr)
1619        transport = await self._make_subprocess_transport(
1620            protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1621        if self._debug and debug_log is not None:
1622            logger.info('%s: %r', debug_log, transport)
1623        return transport, protocol
1624
1625    async def subprocess_exec(self, protocol_factory, program, *args,
1626                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1627                              stderr=subprocess.PIPE, universal_newlines=False,
1628                              shell=False, bufsize=0,
1629                              encoding=None, errors=None, text=None,
1630                              **kwargs):
1631        if universal_newlines:
1632            raise ValueError("universal_newlines must be False")
1633        if shell:
1634            raise ValueError("shell must be False")
1635        if bufsize != 0:
1636            raise ValueError("bufsize must be 0")
1637        if text:
1638            raise ValueError("text must be False")
1639        if encoding is not None:
1640            raise ValueError("encoding must be None")
1641        if errors is not None:
1642            raise ValueError("errors must be None")
1643
1644        popen_args = (program,) + args
1645        protocol = protocol_factory()
1646        debug_log = None
1647        if self._debug:
1648            # don't log parameters: they may contain sensitive information
1649            # (password) and may be too long
1650            debug_log = f'execute program {program!r}'
1651            self._log_subprocess(debug_log, stdin, stdout, stderr)
1652        transport = await self._make_subprocess_transport(
1653            protocol, popen_args, False, stdin, stdout, stderr,
1654            bufsize, **kwargs)
1655        if self._debug and debug_log is not None:
1656            logger.info('%s: %r', debug_log, transport)
1657        return transport, protocol
1658
1659    def get_exception_handler(self):
1660        """Return an exception handler, or None if the default one is in use.
1661        """
1662        return self._exception_handler
1663
1664    def set_exception_handler(self, handler):
1665        """Set handler as the new event loop exception handler.
1666
1667        If handler is None, the default exception handler will
1668        be set.
1669
1670        If handler is a callable object, it should have a
1671        signature matching '(loop, context)', where 'loop'
1672        will be a reference to the active event loop, 'context'
1673        will be a dict object (see `call_exception_handler()`
1674        documentation for details about context).
1675        """
1676        if handler is not None and not callable(handler):
1677            raise TypeError(f'A callable object or None is expected, '
1678                            f'got {handler!r}')
1679        self._exception_handler = handler
1680
1681    def default_exception_handler(self, context):
1682        """Default exception handler.
1683
1684        This is called when an exception occurs and no exception
1685        handler is set, and can be called by a custom exception
1686        handler that wants to defer to the default behavior.
1687
1688        This default handler logs the error message and other
1689        context-dependent information.  In debug mode, a truncated
1690        stack trace is also appended showing where the given object
1691        (e.g. a handle or future or task) was created, if any.
1692
1693        The context parameter has the same meaning as in
1694        `call_exception_handler()`.
1695        """
1696        message = context.get('message')
1697        if not message:
1698            message = 'Unhandled exception in event loop'
1699
1700        exception = context.get('exception')
1701        if exception is not None:
1702            exc_info = (type(exception), exception, exception.__traceback__)
1703        else:
1704            exc_info = False
1705
1706        if ('source_traceback' not in context and
1707                self._current_handle is not None and
1708                self._current_handle._source_traceback):
1709            context['handle_traceback'] = \
1710                self._current_handle._source_traceback
1711
1712        log_lines = [message]
1713        for key in sorted(context):
1714            if key in {'message', 'exception'}:
1715                continue
1716            value = context[key]
1717            if key == 'source_traceback':
1718                tb = ''.join(traceback.format_list(value))
1719                value = 'Object created at (most recent call last):\n'
1720                value += tb.rstrip()
1721            elif key == 'handle_traceback':
1722                tb = ''.join(traceback.format_list(value))
1723                value = 'Handle created at (most recent call last):\n'
1724                value += tb.rstrip()
1725            else:
1726                value = repr(value)
1727            log_lines.append(f'{key}: {value}')
1728
1729        logger.error('\n'.join(log_lines), exc_info=exc_info)
1730
1731    def call_exception_handler(self, context):
1732        """Call the current event loop's exception handler.
1733
1734        The context argument is a dict containing the following keys:
1735
1736        - 'message': Error message;
1737        - 'exception' (optional): Exception object;
1738        - 'future' (optional): Future instance;
1739        - 'task' (optional): Task instance;
1740        - 'handle' (optional): Handle instance;
1741        - 'protocol' (optional): Protocol instance;
1742        - 'transport' (optional): Transport instance;
1743        - 'socket' (optional): Socket instance;
1744        - 'asyncgen' (optional): Asynchronous generator that caused
1745                                 the exception.
1746
1747        New keys maybe introduced in the future.
1748
1749        Note: do not overload this method in an event loop subclass.
1750        For custom exception handling, use the
1751        `set_exception_handler()` method.
1752        """
1753        if self._exception_handler is None:
1754            try:
1755                self.default_exception_handler(context)
1756            except (SystemExit, KeyboardInterrupt):
1757                raise
1758            except BaseException:
1759                # Second protection layer for unexpected errors
1760                # in the default implementation, as well as for subclassed
1761                # event loops with overloaded "default_exception_handler".
1762                logger.error('Exception in default exception handler',
1763                             exc_info=True)
1764        else:
1765            try:
1766                self._exception_handler(self, context)
1767            except (SystemExit, KeyboardInterrupt):
1768                raise
1769            except BaseException as exc:
1770                # Exception in the user set custom exception handler.
1771                try:
1772                    # Let's try default handler.
1773                    self.default_exception_handler({
1774                        'message': 'Unhandled error in exception handler',
1775                        'exception': exc,
1776                        'context': context,
1777                    })
1778                except (SystemExit, KeyboardInterrupt):
1779                    raise
1780                except BaseException:
1781                    # Guard 'default_exception_handler' in case it is
1782                    # overloaded.
1783                    logger.error('Exception in default exception handler '
1784                                 'while handling an unexpected error '
1785                                 'in custom exception handler',
1786                                 exc_info=True)
1787
1788    def _add_callback(self, handle):
1789        """Add a Handle to _scheduled (TimerHandle) or _ready."""
1790        assert isinstance(handle, events.Handle), 'A Handle is required here'
1791        if handle._cancelled:
1792            return
1793        assert not isinstance(handle, events.TimerHandle)
1794        self._ready.append(handle)
1795
1796    def _add_callback_signalsafe(self, handle):
1797        """Like _add_callback() but called from a signal handler."""
1798        self._add_callback(handle)
1799        self._write_to_self()
1800
1801    def _timer_handle_cancelled(self, handle):
1802        """Notification that a TimerHandle has been cancelled."""
1803        if handle._scheduled:
1804            self._timer_cancelled_count += 1
1805
1806    def _run_once(self):
1807        """Run one full iteration of the event loop.
1808
1809        This calls all currently ready callbacks, polls for I/O,
1810        schedules the resulting callbacks, and finally schedules
1811        'call_later' callbacks.
1812        """
1813
1814        sched_count = len(self._scheduled)
1815        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1816            self._timer_cancelled_count / sched_count >
1817                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1818            # Remove delayed calls that were cancelled if their number
1819            # is too high
1820            new_scheduled = []
1821            for handle in self._scheduled:
1822                if handle._cancelled:
1823                    handle._scheduled = False
1824                else:
1825                    new_scheduled.append(handle)
1826
1827            heapq.heapify(new_scheduled)
1828            self._scheduled = new_scheduled
1829            self._timer_cancelled_count = 0
1830        else:
1831            # Remove delayed calls that were cancelled from head of queue.
1832            while self._scheduled and self._scheduled[0]._cancelled:
1833                self._timer_cancelled_count -= 1
1834                handle = heapq.heappop(self._scheduled)
1835                handle._scheduled = False
1836
1837        timeout = None
1838        if self._ready or self._stopping:
1839            timeout = 0
1840        elif self._scheduled:
1841            # Compute the desired timeout.
1842            when = self._scheduled[0]._when
1843            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
1844
1845        event_list = self._selector.select(timeout)
1846        self._process_events(event_list)
1847
1848        # Handle 'later' callbacks that are ready.
1849        end_time = self.time() + self._clock_resolution
1850        while self._scheduled:
1851            handle = self._scheduled[0]
1852            if handle._when >= end_time:
1853                break
1854            handle = heapq.heappop(self._scheduled)
1855            handle._scheduled = False
1856            self._ready.append(handle)
1857
1858        # This is the only place where callbacks are actually *called*.
1859        # All other places just add them to ready.
1860        # Note: We run all currently scheduled callbacks, but not any
1861        # callbacks scheduled by callbacks run this time around --
1862        # they will be run the next time (after another I/O poll).
1863        # Use an idiom that is thread-safe without using locks.
1864        ntodo = len(self._ready)
1865        for i in range(ntodo):
1866            handle = self._ready.popleft()
1867            if handle._cancelled:
1868                continue
1869            if self._debug:
1870                try:
1871                    self._current_handle = handle
1872                    t0 = self.time()
1873                    handle._run()
1874                    dt = self.time() - t0
1875                    if dt >= self.slow_callback_duration:
1876                        logger.warning('Executing %s took %.3f seconds',
1877                                       _format_handle(handle), dt)
1878                finally:
1879                    self._current_handle = None
1880            else:
1881                handle._run()
1882        handle = None  # Needed to break cycles when an exception occurs.
1883
1884    def _set_coroutine_origin_tracking(self, enabled):
1885        if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
1886            return
1887
1888        if enabled:
1889            self._coroutine_origin_tracking_saved_depth = (
1890                sys.get_coroutine_origin_tracking_depth())
1891            sys.set_coroutine_origin_tracking_depth(
1892                constants.DEBUG_STACK_DEPTH)
1893        else:
1894            sys.set_coroutine_origin_tracking_depth(
1895                self._coroutine_origin_tracking_saved_depth)
1896
1897        self._coroutine_origin_tracking_enabled = enabled
1898
1899    def get_debug(self):
1900        return self._debug
1901
1902    def set_debug(self, enabled):
1903        self._debug = enabled
1904
1905        if self.is_running():
1906            self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)
1907