• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of I/O events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called.  This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16import collections
17import collections.abc
18import concurrent.futures
19import functools
20import heapq
21import itertools
22import os
23import socket
24import stat
25import subprocess
26import threading
27import time
28import traceback
29import sys
30import warnings
31import weakref
32
33try:
34    import ssl
35except ImportError:  # pragma: no cover
36    ssl = None
37
38from . import constants
39from . import coroutines
40from . import events
41from . import exceptions
42from . import futures
43from . import protocols
44from . import sslproto
45from . import staggered
46from . import tasks
47from . import transports
48from . import trsock
49from .log import logger
50
51
52__all__ = 'BaseEventLoop',
53
54
55# Minimum number of _scheduled timer handles before cleanup of
56# cancelled handles is performed.
57_MIN_SCHEDULED_TIMER_HANDLES = 100
58
59# Minimum fraction of _scheduled timer handles that are cancelled
60# before cleanup of cancelled handles is performed.
61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
62
63
64_HAS_IPv6 = hasattr(socket, 'AF_INET6')
65
66# Maximum timeout passed to select to avoid OS limitations
67MAXIMUM_SELECT_TIMEOUT = 24 * 3600
68
69# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s
70# *reuse_address* parameter
71_unset = object()
72
73
74def _format_handle(handle):
75    cb = handle._callback
76    if isinstance(getattr(cb, '__self__', None), tasks.Task):
77        # format the task
78        return repr(cb.__self__)
79    else:
80        return str(handle)
81
82
83def _format_pipe(fd):
84    if fd == subprocess.PIPE:
85        return '<pipe>'
86    elif fd == subprocess.STDOUT:
87        return '<stdout>'
88    else:
89        return repr(fd)
90
91
92def _set_reuseport(sock):
93    if not hasattr(socket, 'SO_REUSEPORT'):
94        raise ValueError('reuse_port not supported by socket module')
95    else:
96        try:
97            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
98        except OSError:
99            raise ValueError('reuse_port not supported by socket module, '
100                             'SO_REUSEPORT defined but not implemented.')
101
102
103def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
104    # Try to skip getaddrinfo if "host" is already an IP. Users might have
105    # handled name resolution in their own code and pass in resolved IPs.
106    if not hasattr(socket, 'inet_pton'):
107        return
108
109    if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
110            host is None:
111        return None
112
113    if type == socket.SOCK_STREAM:
114        proto = socket.IPPROTO_TCP
115    elif type == socket.SOCK_DGRAM:
116        proto = socket.IPPROTO_UDP
117    else:
118        return None
119
120    if port is None:
121        port = 0
122    elif isinstance(port, bytes) and port == b'':
123        port = 0
124    elif isinstance(port, str) and port == '':
125        port = 0
126    else:
127        # If port's a service name like "http", don't skip getaddrinfo.
128        try:
129            port = int(port)
130        except (TypeError, ValueError):
131            return None
132
133    if family == socket.AF_UNSPEC:
134        afs = [socket.AF_INET]
135        if _HAS_IPv6:
136            afs.append(socket.AF_INET6)
137    else:
138        afs = [family]
139
140    if isinstance(host, bytes):
141        host = host.decode('idna')
142    if '%' in host:
143        # Linux's inet_pton doesn't accept an IPv6 zone index after host,
144        # like '::1%lo0'.
145        return None
146
147    for af in afs:
148        try:
149            socket.inet_pton(af, host)
150            # The host has already been resolved.
151            if _HAS_IPv6 and af == socket.AF_INET6:
152                return af, type, proto, '', (host, port, flowinfo, scopeid)
153            else:
154                return af, type, proto, '', (host, port)
155        except OSError:
156            pass
157
158    # "host" is not an IP address.
159    return None
160
161
162def _interleave_addrinfos(addrinfos, first_address_family_count=1):
163    """Interleave list of addrinfo tuples by family."""
164    # Group addresses by family
165    addrinfos_by_family = collections.OrderedDict()
166    for addr in addrinfos:
167        family = addr[0]
168        if family not in addrinfos_by_family:
169            addrinfos_by_family[family] = []
170        addrinfos_by_family[family].append(addr)
171    addrinfos_lists = list(addrinfos_by_family.values())
172
173    reordered = []
174    if first_address_family_count > 1:
175        reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
176        del addrinfos_lists[0][:first_address_family_count - 1]
177    reordered.extend(
178        a for a in itertools.chain.from_iterable(
179            itertools.zip_longest(*addrinfos_lists)
180        ) if a is not None)
181    return reordered
182
183
184def _run_until_complete_cb(fut):
185    if not fut.cancelled():
186        exc = fut.exception()
187        if isinstance(exc, (SystemExit, KeyboardInterrupt)):
188            # Issue #22429: run_forever() already finished, no need to
189            # stop it.
190            return
191    futures._get_loop(fut).stop()
192
193
194if hasattr(socket, 'TCP_NODELAY'):
195    def _set_nodelay(sock):
196        if (sock.family in {socket.AF_INET, socket.AF_INET6} and
197                sock.type == socket.SOCK_STREAM and
198                sock.proto == socket.IPPROTO_TCP):
199            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
200else:
201    def _set_nodelay(sock):
202        pass
203
204
205class _SendfileFallbackProtocol(protocols.Protocol):
206    def __init__(self, transp):
207        if not isinstance(transp, transports._FlowControlMixin):
208            raise TypeError("transport should be _FlowControlMixin instance")
209        self._transport = transp
210        self._proto = transp.get_protocol()
211        self._should_resume_reading = transp.is_reading()
212        self._should_resume_writing = transp._protocol_paused
213        transp.pause_reading()
214        transp.set_protocol(self)
215        if self._should_resume_writing:
216            self._write_ready_fut = self._transport._loop.create_future()
217        else:
218            self._write_ready_fut = None
219
220    async def drain(self):
221        if self._transport.is_closing():
222            raise ConnectionError("Connection closed by peer")
223        fut = self._write_ready_fut
224        if fut is None:
225            return
226        await fut
227
228    def connection_made(self, transport):
229        raise RuntimeError("Invalid state: "
230                           "connection should have been established already.")
231
232    def connection_lost(self, exc):
233        if self._write_ready_fut is not None:
234            # Never happens if peer disconnects after sending the whole content
235            # Thus disconnection is always an exception from user perspective
236            if exc is None:
237                self._write_ready_fut.set_exception(
238                    ConnectionError("Connection is closed by peer"))
239            else:
240                self._write_ready_fut.set_exception(exc)
241        self._proto.connection_lost(exc)
242
243    def pause_writing(self):
244        if self._write_ready_fut is not None:
245            return
246        self._write_ready_fut = self._transport._loop.create_future()
247
248    def resume_writing(self):
249        if self._write_ready_fut is None:
250            return
251        self._write_ready_fut.set_result(False)
252        self._write_ready_fut = None
253
254    def data_received(self, data):
255        raise RuntimeError("Invalid state: reading should be paused")
256
257    def eof_received(self):
258        raise RuntimeError("Invalid state: reading should be paused")
259
260    async def restore(self):
261        self._transport.set_protocol(self._proto)
262        if self._should_resume_reading:
263            self._transport.resume_reading()
264        if self._write_ready_fut is not None:
265            # Cancel the future.
266            # Basically it has no effect because protocol is switched back,
267            # no code should wait for it anymore.
268            self._write_ready_fut.cancel()
269        if self._should_resume_writing:
270            self._proto.resume_writing()
271
272
273class Server(events.AbstractServer):
274
275    def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
276                 ssl_handshake_timeout):
277        self._loop = loop
278        self._sockets = sockets
279        self._active_count = 0
280        self._waiters = []
281        self._protocol_factory = protocol_factory
282        self._backlog = backlog
283        self._ssl_context = ssl_context
284        self._ssl_handshake_timeout = ssl_handshake_timeout
285        self._serving = False
286        self._serving_forever_fut = None
287
288    def __repr__(self):
289        return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
290
291    def _attach(self):
292        assert self._sockets is not None
293        self._active_count += 1
294
295    def _detach(self):
296        assert self._active_count > 0
297        self._active_count -= 1
298        if self._active_count == 0 and self._sockets is None:
299            self._wakeup()
300
301    def _wakeup(self):
302        waiters = self._waiters
303        self._waiters = None
304        for waiter in waiters:
305            if not waiter.done():
306                waiter.set_result(waiter)
307
308    def _start_serving(self):
309        if self._serving:
310            return
311        self._serving = True
312        for sock in self._sockets:
313            sock.listen(self._backlog)
314            self._loop._start_serving(
315                self._protocol_factory, sock, self._ssl_context,
316                self, self._backlog, self._ssl_handshake_timeout)
317
318    def get_loop(self):
319        return self._loop
320
321    def is_serving(self):
322        return self._serving
323
324    @property
325    def sockets(self):
326        if self._sockets is None:
327            return ()
328        return tuple(trsock.TransportSocket(s) for s in self._sockets)
329
330    def close(self):
331        sockets = self._sockets
332        if sockets is None:
333            return
334        self._sockets = None
335
336        for sock in sockets:
337            self._loop._stop_serving(sock)
338
339        self._serving = False
340
341        if (self._serving_forever_fut is not None and
342                not self._serving_forever_fut.done()):
343            self._serving_forever_fut.cancel()
344            self._serving_forever_fut = None
345
346        if self._active_count == 0:
347            self._wakeup()
348
349    async def start_serving(self):
350        self._start_serving()
351        # Skip one loop iteration so that all 'loop.add_reader'
352        # go through.
353        await tasks.sleep(0, loop=self._loop)
354
355    async def serve_forever(self):
356        if self._serving_forever_fut is not None:
357            raise RuntimeError(
358                f'server {self!r} is already being awaited on serve_forever()')
359        if self._sockets is None:
360            raise RuntimeError(f'server {self!r} is closed')
361
362        self._start_serving()
363        self._serving_forever_fut = self._loop.create_future()
364
365        try:
366            await self._serving_forever_fut
367        except exceptions.CancelledError:
368            try:
369                self.close()
370                await self.wait_closed()
371            finally:
372                raise
373        finally:
374            self._serving_forever_fut = None
375
376    async def wait_closed(self):
377        if self._sockets is None or self._waiters is None:
378            return
379        waiter = self._loop.create_future()
380        self._waiters.append(waiter)
381        await waiter
382
383
384class BaseEventLoop(events.AbstractEventLoop):
385
386    def __init__(self):
387        self._timer_cancelled_count = 0
388        self._closed = False
389        self._stopping = False
390        self._ready = collections.deque()
391        self._scheduled = []
392        self._default_executor = None
393        self._internal_fds = 0
394        # Identifier of the thread running the event loop, or None if the
395        # event loop is not running
396        self._thread_id = None
397        self._clock_resolution = time.get_clock_info('monotonic').resolution
398        self._exception_handler = None
399        self.set_debug(coroutines._is_debug_mode())
400        # In debug mode, if the execution of a callback or a step of a task
401        # exceed this duration in seconds, the slow callback/task is logged.
402        self.slow_callback_duration = 0.1
403        self._current_handle = None
404        self._task_factory = None
405        self._coroutine_origin_tracking_enabled = False
406        self._coroutine_origin_tracking_saved_depth = None
407
408        # A weak set of all asynchronous generators that are
409        # being iterated by the loop.
410        self._asyncgens = weakref.WeakSet()
411        # Set to True when `loop.shutdown_asyncgens` is called.
412        self._asyncgens_shutdown_called = False
413        # Set to True when `loop.shutdown_default_executor` is called.
414        self._executor_shutdown_called = False
415
416    def __repr__(self):
417        return (
418            f'<{self.__class__.__name__} running={self.is_running()} '
419            f'closed={self.is_closed()} debug={self.get_debug()}>'
420        )
421
422    def create_future(self):
423        """Create a Future object attached to the loop."""
424        return futures.Future(loop=self)
425
426    def create_task(self, coro, *, name=None):
427        """Schedule a coroutine object.
428
429        Return a task object.
430        """
431        self._check_closed()
432        if self._task_factory is None:
433            task = tasks.Task(coro, loop=self, name=name)
434            if task._source_traceback:
435                del task._source_traceback[-1]
436        else:
437            task = self._task_factory(self, coro)
438            tasks._set_task_name(task, name)
439
440        return task
441
442    def set_task_factory(self, factory):
443        """Set a task factory that will be used by loop.create_task().
444
445        If factory is None the default task factory will be set.
446
447        If factory is a callable, it should have a signature matching
448        '(loop, coro)', where 'loop' will be a reference to the active
449        event loop, 'coro' will be a coroutine object.  The callable
450        must return a Future.
451        """
452        if factory is not None and not callable(factory):
453            raise TypeError('task factory must be a callable or None')
454        self._task_factory = factory
455
456    def get_task_factory(self):
457        """Return a task factory, or None if the default one is in use."""
458        return self._task_factory
459
460    def _make_socket_transport(self, sock, protocol, waiter=None, *,
461                               extra=None, server=None):
462        """Create socket transport."""
463        raise NotImplementedError
464
465    def _make_ssl_transport(
466            self, rawsock, protocol, sslcontext, waiter=None,
467            *, server_side=False, server_hostname=None,
468            extra=None, server=None,
469            ssl_handshake_timeout=None,
470            call_connection_made=True):
471        """Create SSL transport."""
472        raise NotImplementedError
473
474    def _make_datagram_transport(self, sock, protocol,
475                                 address=None, waiter=None, extra=None):
476        """Create datagram transport."""
477        raise NotImplementedError
478
479    def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
480                                  extra=None):
481        """Create read pipe transport."""
482        raise NotImplementedError
483
484    def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
485                                   extra=None):
486        """Create write pipe transport."""
487        raise NotImplementedError
488
489    async def _make_subprocess_transport(self, protocol, args, shell,
490                                         stdin, stdout, stderr, bufsize,
491                                         extra=None, **kwargs):
492        """Create subprocess transport."""
493        raise NotImplementedError
494
495    def _write_to_self(self):
496        """Write a byte to self-pipe, to wake up the event loop.
497
498        This may be called from a different thread.
499
500        The subclass is responsible for implementing the self-pipe.
501        """
502        raise NotImplementedError
503
504    def _process_events(self, event_list):
505        """Process selector events."""
506        raise NotImplementedError
507
508    def _check_closed(self):
509        if self._closed:
510            raise RuntimeError('Event loop is closed')
511
512    def _check_default_executor(self):
513        if self._executor_shutdown_called:
514            raise RuntimeError('Executor shutdown has been called')
515
516    def _asyncgen_finalizer_hook(self, agen):
517        self._asyncgens.discard(agen)
518        if not self.is_closed():
519            self.call_soon_threadsafe(self.create_task, agen.aclose())
520
521    def _asyncgen_firstiter_hook(self, agen):
522        if self._asyncgens_shutdown_called:
523            warnings.warn(
524                f"asynchronous generator {agen!r} was scheduled after "
525                f"loop.shutdown_asyncgens() call",
526                ResourceWarning, source=self)
527
528        self._asyncgens.add(agen)
529
530    async def shutdown_asyncgens(self):
531        """Shutdown all active asynchronous generators."""
532        self._asyncgens_shutdown_called = True
533
534        if not len(self._asyncgens):
535            # If Python version is <3.6 or we don't have any asynchronous
536            # generators alive.
537            return
538
539        closing_agens = list(self._asyncgens)
540        self._asyncgens.clear()
541
542        results = await tasks.gather(
543            *[ag.aclose() for ag in closing_agens],
544            return_exceptions=True,
545            loop=self)
546
547        for result, agen in zip(results, closing_agens):
548            if isinstance(result, Exception):
549                self.call_exception_handler({
550                    'message': f'an error occurred during closing of '
551                               f'asynchronous generator {agen!r}',
552                    'exception': result,
553                    'asyncgen': agen
554                })
555
556    async def shutdown_default_executor(self):
557        """Schedule the shutdown of the default executor."""
558        self._executor_shutdown_called = True
559        if self._default_executor is None:
560            return
561        future = self.create_future()
562        thread = threading.Thread(target=self._do_shutdown, args=(future,))
563        thread.start()
564        try:
565            await future
566        finally:
567            thread.join()
568
569    def _do_shutdown(self, future):
570        try:
571            self._default_executor.shutdown(wait=True)
572            self.call_soon_threadsafe(future.set_result, None)
573        except Exception as ex:
574            self.call_soon_threadsafe(future.set_exception, ex)
575
576    def _check_running(self):
577        if self.is_running():
578            raise RuntimeError('This event loop is already running')
579        if events._get_running_loop() is not None:
580            raise RuntimeError(
581                'Cannot run the event loop while another loop is running')
582
583    def run_forever(self):
584        """Run until stop() is called."""
585        self._check_closed()
586        self._check_running()
587        self._set_coroutine_origin_tracking(self._debug)
588        self._thread_id = threading.get_ident()
589
590        old_agen_hooks = sys.get_asyncgen_hooks()
591        sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
592                               finalizer=self._asyncgen_finalizer_hook)
593        try:
594            events._set_running_loop(self)
595            while True:
596                self._run_once()
597                if self._stopping:
598                    break
599        finally:
600            self._stopping = False
601            self._thread_id = None
602            events._set_running_loop(None)
603            self._set_coroutine_origin_tracking(False)
604            sys.set_asyncgen_hooks(*old_agen_hooks)
605
606    def run_until_complete(self, future):
607        """Run until the Future is done.
608
609        If the argument is a coroutine, it is wrapped in a Task.
610
611        WARNING: It would be disastrous to call run_until_complete()
612        with the same coroutine twice -- it would wrap it in two
613        different Tasks and that can't be good.
614
615        Return the Future's result, or raise its exception.
616        """
617        self._check_closed()
618        self._check_running()
619
620        new_task = not futures.isfuture(future)
621        future = tasks.ensure_future(future, loop=self)
622        if new_task:
623            # An exception is raised if the future didn't complete, so there
624            # is no need to log the "destroy pending task" message
625            future._log_destroy_pending = False
626
627        future.add_done_callback(_run_until_complete_cb)
628        try:
629            self.run_forever()
630        except:
631            if new_task and future.done() and not future.cancelled():
632                # The coroutine raised a BaseException. Consume the exception
633                # to not log a warning, the caller doesn't have access to the
634                # local task.
635                future.exception()
636            raise
637        finally:
638            future.remove_done_callback(_run_until_complete_cb)
639        if not future.done():
640            raise RuntimeError('Event loop stopped before Future completed.')
641
642        return future.result()
643
644    def stop(self):
645        """Stop running the event loop.
646
647        Every callback already scheduled will still run.  This simply informs
648        run_forever to stop looping after a complete iteration.
649        """
650        self._stopping = True
651
652    def close(self):
653        """Close the event loop.
654
655        This clears the queues and shuts down the executor,
656        but does not wait for the executor to finish.
657
658        The event loop must not be running.
659        """
660        if self.is_running():
661            raise RuntimeError("Cannot close a running event loop")
662        if self._closed:
663            return
664        if self._debug:
665            logger.debug("Close %r", self)
666        self._closed = True
667        self._ready.clear()
668        self._scheduled.clear()
669        self._executor_shutdown_called = True
670        executor = self._default_executor
671        if executor is not None:
672            self._default_executor = None
673            executor.shutdown(wait=False)
674
675    def is_closed(self):
676        """Returns True if the event loop was closed."""
677        return self._closed
678
679    def __del__(self, _warn=warnings.warn):
680        if not self.is_closed():
681            _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
682            if not self.is_running():
683                self.close()
684
685    def is_running(self):
686        """Returns True if the event loop is running."""
687        return (self._thread_id is not None)
688
689    def time(self):
690        """Return the time according to the event loop's clock.
691
692        This is a float expressed in seconds since an epoch, but the
693        epoch, precision, accuracy and drift are unspecified and may
694        differ per event loop.
695        """
696        return time.monotonic()
697
698    def call_later(self, delay, callback, *args, context=None):
699        """Arrange for a callback to be called at a given time.
700
701        Return a Handle: an opaque object with a cancel() method that
702        can be used to cancel the call.
703
704        The delay can be an int or float, expressed in seconds.  It is
705        always relative to the current time.
706
707        Each callback will be called exactly once.  If two callbacks
708        are scheduled for exactly the same time, it undefined which
709        will be called first.
710
711        Any positional arguments after the callback will be passed to
712        the callback when it is called.
713        """
714        timer = self.call_at(self.time() + delay, callback, *args,
715                             context=context)
716        if timer._source_traceback:
717            del timer._source_traceback[-1]
718        return timer
719
720    def call_at(self, when, callback, *args, context=None):
721        """Like call_later(), but uses an absolute time.
722
723        Absolute time corresponds to the event loop's time() method.
724        """
725        self._check_closed()
726        if self._debug:
727            self._check_thread()
728            self._check_callback(callback, 'call_at')
729        timer = events.TimerHandle(when, callback, args, self, context)
730        if timer._source_traceback:
731            del timer._source_traceback[-1]
732        heapq.heappush(self._scheduled, timer)
733        timer._scheduled = True
734        return timer
735
736    def call_soon(self, callback, *args, context=None):
737        """Arrange for a callback to be called as soon as possible.
738
739        This operates as a FIFO queue: callbacks are called in the
740        order in which they are registered.  Each callback will be
741        called exactly once.
742
743        Any positional arguments after the callback will be passed to
744        the callback when it is called.
745        """
746        self._check_closed()
747        if self._debug:
748            self._check_thread()
749            self._check_callback(callback, 'call_soon')
750        handle = self._call_soon(callback, args, context)
751        if handle._source_traceback:
752            del handle._source_traceback[-1]
753        return handle
754
755    def _check_callback(self, callback, method):
756        if (coroutines.iscoroutine(callback) or
757                coroutines.iscoroutinefunction(callback)):
758            raise TypeError(
759                f"coroutines cannot be used with {method}()")
760        if not callable(callback):
761            raise TypeError(
762                f'a callable object was expected by {method}(), '
763                f'got {callback!r}')
764
765    def _call_soon(self, callback, args, context):
766        handle = events.Handle(callback, args, self, context)
767        if handle._source_traceback:
768            del handle._source_traceback[-1]
769        self._ready.append(handle)
770        return handle
771
772    def _check_thread(self):
773        """Check that the current thread is the thread running the event loop.
774
775        Non-thread-safe methods of this class make this assumption and will
776        likely behave incorrectly when the assumption is violated.
777
778        Should only be called when (self._debug == True).  The caller is
779        responsible for checking this condition for performance reasons.
780        """
781        if self._thread_id is None:
782            return
783        thread_id = threading.get_ident()
784        if thread_id != self._thread_id:
785            raise RuntimeError(
786                "Non-thread-safe operation invoked on an event loop other "
787                "than the current one")
788
789    def call_soon_threadsafe(self, callback, *args, context=None):
790        """Like call_soon(), but thread-safe."""
791        self._check_closed()
792        if self._debug:
793            self._check_callback(callback, 'call_soon_threadsafe')
794        handle = self._call_soon(callback, args, context)
795        if handle._source_traceback:
796            del handle._source_traceback[-1]
797        self._write_to_self()
798        return handle
799
800    def run_in_executor(self, executor, func, *args):
801        self._check_closed()
802        if self._debug:
803            self._check_callback(func, 'run_in_executor')
804        if executor is None:
805            executor = self._default_executor
806            # Only check when the default executor is being used
807            self._check_default_executor()
808            if executor is None:
809                executor = concurrent.futures.ThreadPoolExecutor(
810                    thread_name_prefix='asyncio'
811                )
812                self._default_executor = executor
813        return futures.wrap_future(
814            executor.submit(func, *args), loop=self)
815
816    def set_default_executor(self, executor):
817        if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
818            warnings.warn(
819                'Using the default executor that is not an instance of '
820                'ThreadPoolExecutor is deprecated and will be prohibited '
821                'in Python 3.9',
822                DeprecationWarning, 2)
823        self._default_executor = executor
824
825    def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
826        msg = [f"{host}:{port!r}"]
827        if family:
828            msg.append(f'family={family!r}')
829        if type:
830            msg.append(f'type={type!r}')
831        if proto:
832            msg.append(f'proto={proto!r}')
833        if flags:
834            msg.append(f'flags={flags!r}')
835        msg = ', '.join(msg)
836        logger.debug('Get address info %s', msg)
837
838        t0 = self.time()
839        addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
840        dt = self.time() - t0
841
842        msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
843        if dt >= self.slow_callback_duration:
844            logger.info(msg)
845        else:
846            logger.debug(msg)
847        return addrinfo
848
849    async def getaddrinfo(self, host, port, *,
850                          family=0, type=0, proto=0, flags=0):
851        if self._debug:
852            getaddr_func = self._getaddrinfo_debug
853        else:
854            getaddr_func = socket.getaddrinfo
855
856        return await self.run_in_executor(
857            None, getaddr_func, host, port, family, type, proto, flags)
858
859    async def getnameinfo(self, sockaddr, flags=0):
860        return await self.run_in_executor(
861            None, socket.getnameinfo, sockaddr, flags)
862
863    async def sock_sendfile(self, sock, file, offset=0, count=None,
864                            *, fallback=True):
865        if self._debug and sock.gettimeout() != 0:
866            raise ValueError("the socket must be non-blocking")
867        self._check_sendfile_params(sock, file, offset, count)
868        try:
869            return await self._sock_sendfile_native(sock, file,
870                                                    offset, count)
871        except exceptions.SendfileNotAvailableError as exc:
872            if not fallback:
873                raise
874        return await self._sock_sendfile_fallback(sock, file,
875                                                  offset, count)
876
877    async def _sock_sendfile_native(self, sock, file, offset, count):
878        # NB: sendfile syscall is not supported for SSL sockets and
879        # non-mmap files even if sendfile is supported by OS
880        raise exceptions.SendfileNotAvailableError(
881            f"syscall sendfile is not available for socket {sock!r} "
882            "and file {file!r} combination")
883
884    async def _sock_sendfile_fallback(self, sock, file, offset, count):
885        if offset:
886            file.seek(offset)
887        blocksize = (
888            min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
889            if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
890        )
891        buf = bytearray(blocksize)
892        total_sent = 0
893        try:
894            while True:
895                if count:
896                    blocksize = min(count - total_sent, blocksize)
897                    if blocksize <= 0:
898                        break
899                view = memoryview(buf)[:blocksize]
900                read = await self.run_in_executor(None, file.readinto, view)
901                if not read:
902                    break  # EOF
903                await self.sock_sendall(sock, view[:read])
904                total_sent += read
905            return total_sent
906        finally:
907            if total_sent > 0 and hasattr(file, 'seek'):
908                file.seek(offset + total_sent)
909
910    def _check_sendfile_params(self, sock, file, offset, count):
911        if 'b' not in getattr(file, 'mode', 'b'):
912            raise ValueError("file should be opened in binary mode")
913        if not sock.type == socket.SOCK_STREAM:
914            raise ValueError("only SOCK_STREAM type sockets are supported")
915        if count is not None:
916            if not isinstance(count, int):
917                raise TypeError(
918                    "count must be a positive integer (got {!r})".format(count))
919            if count <= 0:
920                raise ValueError(
921                    "count must be a positive integer (got {!r})".format(count))
922        if not isinstance(offset, int):
923            raise TypeError(
924                "offset must be a non-negative integer (got {!r})".format(
925                    offset))
926        if offset < 0:
927            raise ValueError(
928                "offset must be a non-negative integer (got {!r})".format(
929                    offset))
930
931    async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
932        """Create, bind and connect one socket."""
933        my_exceptions = []
934        exceptions.append(my_exceptions)
935        family, type_, proto, _, address = addr_info
936        sock = None
937        try:
938            sock = socket.socket(family=family, type=type_, proto=proto)
939            sock.setblocking(False)
940            if local_addr_infos is not None:
941                for _, _, _, _, laddr in local_addr_infos:
942                    try:
943                        sock.bind(laddr)
944                        break
945                    except OSError as exc:
946                        msg = (
947                            f'error while attempting to bind on '
948                            f'address {laddr!r}: '
949                            f'{exc.strerror.lower()}'
950                        )
951                        exc = OSError(exc.errno, msg)
952                        my_exceptions.append(exc)
953                else:  # all bind attempts failed
954                    raise my_exceptions.pop()
955            await self.sock_connect(sock, address)
956            return sock
957        except OSError as exc:
958            my_exceptions.append(exc)
959            if sock is not None:
960                sock.close()
961            raise
962        except:
963            if sock is not None:
964                sock.close()
965            raise
966
967    async def create_connection(
968            self, protocol_factory, host=None, port=None,
969            *, ssl=None, family=0,
970            proto=0, flags=0, sock=None,
971            local_addr=None, server_hostname=None,
972            ssl_handshake_timeout=None,
973            happy_eyeballs_delay=None, interleave=None):
974        """Connect to a TCP server.
975
976        Create a streaming transport connection to a given Internet host and
977        port: socket family AF_INET or socket.AF_INET6 depending on host (or
978        family if specified), socket type SOCK_STREAM. protocol_factory must be
979        a callable returning a protocol instance.
980
981        This method is a coroutine which will try to establish the connection
982        in the background.  When successful, the coroutine returns a
983        (transport, protocol) pair.
984        """
985        if server_hostname is not None and not ssl:
986            raise ValueError('server_hostname is only meaningful with ssl')
987
988        if server_hostname is None and ssl:
989            # Use host as default for server_hostname.  It is an error
990            # if host is empty or not set, e.g. when an
991            # already-connected socket was passed or when only a port
992            # is given.  To avoid this error, you can pass
993            # server_hostname='' -- this will bypass the hostname
994            # check.  (This also means that if host is a numeric
995            # IP/IPv6 address, we will attempt to verify that exact
996            # address; this will probably fail, but it is possible to
997            # create a certificate for a specific IP address, so we
998            # don't judge it here.)
999            if not host:
1000                raise ValueError('You must set server_hostname '
1001                                 'when using ssl without a host')
1002            server_hostname = host
1003
1004        if ssl_handshake_timeout is not None and not ssl:
1005            raise ValueError(
1006                'ssl_handshake_timeout is only meaningful with ssl')
1007
1008        if happy_eyeballs_delay is not None and interleave is None:
1009            # If using happy eyeballs, default to interleave addresses by family
1010            interleave = 1
1011
1012        if host is not None or port is not None:
1013            if sock is not None:
1014                raise ValueError(
1015                    'host/port and sock can not be specified at the same time')
1016
1017            infos = await self._ensure_resolved(
1018                (host, port), family=family,
1019                type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
1020            if not infos:
1021                raise OSError('getaddrinfo() returned empty list')
1022
1023            if local_addr is not None:
1024                laddr_infos = await self._ensure_resolved(
1025                    local_addr, family=family,
1026                    type=socket.SOCK_STREAM, proto=proto,
1027                    flags=flags, loop=self)
1028                if not laddr_infos:
1029                    raise OSError('getaddrinfo() returned empty list')
1030            else:
1031                laddr_infos = None
1032
1033            if interleave:
1034                infos = _interleave_addrinfos(infos, interleave)
1035
1036            exceptions = []
1037            if happy_eyeballs_delay is None:
1038                # not using happy eyeballs
1039                for addrinfo in infos:
1040                    try:
1041                        sock = await self._connect_sock(
1042                            exceptions, addrinfo, laddr_infos)
1043                        break
1044                    except OSError:
1045                        continue
1046            else:  # using happy eyeballs
1047                sock, _, _ = await staggered.staggered_race(
1048                    (functools.partial(self._connect_sock,
1049                                       exceptions, addrinfo, laddr_infos)
1050                     for addrinfo in infos),
1051                    happy_eyeballs_delay, loop=self)
1052
1053            if sock is None:
1054                exceptions = [exc for sub in exceptions for exc in sub]
1055                if len(exceptions) == 1:
1056                    raise exceptions[0]
1057                else:
1058                    # If they all have the same str(), raise one.
1059                    model = str(exceptions[0])
1060                    if all(str(exc) == model for exc in exceptions):
1061                        raise exceptions[0]
1062                    # Raise a combined exception so the user can see all
1063                    # the various error messages.
1064                    raise OSError('Multiple exceptions: {}'.format(
1065                        ', '.join(str(exc) for exc in exceptions)))
1066
1067        else:
1068            if sock is None:
1069                raise ValueError(
1070                    'host and port was not specified and no sock specified')
1071            if sock.type != socket.SOCK_STREAM:
1072                # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1073                # are SOCK_STREAM.
1074                # We support passing AF_UNIX sockets even though we have
1075                # a dedicated API for that: create_unix_connection.
1076                # Disallowing AF_UNIX in this method, breaks backwards
1077                # compatibility.
1078                raise ValueError(
1079                    f'A Stream Socket was expected, got {sock!r}')
1080
1081        transport, protocol = await self._create_connection_transport(
1082            sock, protocol_factory, ssl, server_hostname,
1083            ssl_handshake_timeout=ssl_handshake_timeout)
1084        if self._debug:
1085            # Get the socket from the transport because SSL transport closes
1086            # the old socket and creates a new SSL socket
1087            sock = transport.get_extra_info('socket')
1088            logger.debug("%r connected to %s:%r: (%r, %r)",
1089                         sock, host, port, transport, protocol)
1090        return transport, protocol
1091
1092    async def _create_connection_transport(
1093            self, sock, protocol_factory, ssl,
1094            server_hostname, server_side=False,
1095            ssl_handshake_timeout=None):
1096
1097        sock.setblocking(False)
1098
1099        protocol = protocol_factory()
1100        waiter = self.create_future()
1101        if ssl:
1102            sslcontext = None if isinstance(ssl, bool) else ssl
1103            transport = self._make_ssl_transport(
1104                sock, protocol, sslcontext, waiter,
1105                server_side=server_side, server_hostname=server_hostname,
1106                ssl_handshake_timeout=ssl_handshake_timeout)
1107        else:
1108            transport = self._make_socket_transport(sock, protocol, waiter)
1109
1110        try:
1111            await waiter
1112        except:
1113            transport.close()
1114            raise
1115
1116        return transport, protocol
1117
1118    async def sendfile(self, transport, file, offset=0, count=None,
1119                       *, fallback=True):
1120        """Send a file to transport.
1121
1122        Return the total number of bytes which were sent.
1123
1124        The method uses high-performance os.sendfile if available.
1125
1126        file must be a regular file object opened in binary mode.
1127
1128        offset tells from where to start reading the file. If specified,
1129        count is the total number of bytes to transmit as opposed to
1130        sending the file until EOF is reached. File position is updated on
1131        return or also in case of error in which case file.tell()
1132        can be used to figure out the number of bytes
1133        which were sent.
1134
1135        fallback set to True makes asyncio to manually read and send
1136        the file when the platform does not support the sendfile syscall
1137        (e.g. Windows or SSL socket on Unix).
1138
1139        Raise SendfileNotAvailableError if the system does not support
1140        sendfile syscall and fallback is False.
1141        """
1142        if transport.is_closing():
1143            raise RuntimeError("Transport is closing")
1144        mode = getattr(transport, '_sendfile_compatible',
1145                       constants._SendfileMode.UNSUPPORTED)
1146        if mode is constants._SendfileMode.UNSUPPORTED:
1147            raise RuntimeError(
1148                f"sendfile is not supported for transport {transport!r}")
1149        if mode is constants._SendfileMode.TRY_NATIVE:
1150            try:
1151                return await self._sendfile_native(transport, file,
1152                                                   offset, count)
1153            except exceptions.SendfileNotAvailableError as exc:
1154                if not fallback:
1155                    raise
1156
1157        if not fallback:
1158            raise RuntimeError(
1159                f"fallback is disabled and native sendfile is not "
1160                f"supported for transport {transport!r}")
1161
1162        return await self._sendfile_fallback(transport, file,
1163                                             offset, count)
1164
1165    async def _sendfile_native(self, transp, file, offset, count):
1166        raise exceptions.SendfileNotAvailableError(
1167            "sendfile syscall is not supported")
1168
1169    async def _sendfile_fallback(self, transp, file, offset, count):
1170        if offset:
1171            file.seek(offset)
1172        blocksize = min(count, 16384) if count else 16384
1173        buf = bytearray(blocksize)
1174        total_sent = 0
1175        proto = _SendfileFallbackProtocol(transp)
1176        try:
1177            while True:
1178                if count:
1179                    blocksize = min(count - total_sent, blocksize)
1180                    if blocksize <= 0:
1181                        return total_sent
1182                view = memoryview(buf)[:blocksize]
1183                read = await self.run_in_executor(None, file.readinto, view)
1184                if not read:
1185                    return total_sent  # EOF
1186                await proto.drain()
1187                transp.write(view[:read])
1188                total_sent += read
1189        finally:
1190            if total_sent > 0 and hasattr(file, 'seek'):
1191                file.seek(offset + total_sent)
1192            await proto.restore()
1193
1194    async def start_tls(self, transport, protocol, sslcontext, *,
1195                        server_side=False,
1196                        server_hostname=None,
1197                        ssl_handshake_timeout=None):
1198        """Upgrade transport to TLS.
1199
1200        Return a new transport that *protocol* should start using
1201        immediately.
1202        """
1203        if ssl is None:
1204            raise RuntimeError('Python ssl module is not available')
1205
1206        if not isinstance(sslcontext, ssl.SSLContext):
1207            raise TypeError(
1208                f'sslcontext is expected to be an instance of ssl.SSLContext, '
1209                f'got {sslcontext!r}')
1210
1211        if not getattr(transport, '_start_tls_compatible', False):
1212            raise TypeError(
1213                f'transport {transport!r} is not supported by start_tls()')
1214
1215        waiter = self.create_future()
1216        ssl_protocol = sslproto.SSLProtocol(
1217            self, protocol, sslcontext, waiter,
1218            server_side, server_hostname,
1219            ssl_handshake_timeout=ssl_handshake_timeout,
1220            call_connection_made=False)
1221
1222        # Pause early so that "ssl_protocol.data_received()" doesn't
1223        # have a chance to get called before "ssl_protocol.connection_made()".
1224        transport.pause_reading()
1225
1226        transport.set_protocol(ssl_protocol)
1227        conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1228        resume_cb = self.call_soon(transport.resume_reading)
1229
1230        try:
1231            await waiter
1232        except BaseException:
1233            transport.close()
1234            conmade_cb.cancel()
1235            resume_cb.cancel()
1236            raise
1237
1238        return ssl_protocol._app_transport
1239
1240    async def create_datagram_endpoint(self, protocol_factory,
1241                                       local_addr=None, remote_addr=None, *,
1242                                       family=0, proto=0, flags=0,
1243                                       reuse_address=_unset, reuse_port=None,
1244                                       allow_broadcast=None, sock=None):
1245        """Create datagram connection."""
1246        if sock is not None:
1247            if sock.type != socket.SOCK_DGRAM:
1248                raise ValueError(
1249                    f'A UDP Socket was expected, got {sock!r}')
1250            if (local_addr or remote_addr or
1251                    family or proto or flags or
1252                    reuse_port or allow_broadcast):
1253                # show the problematic kwargs in exception msg
1254                opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1255                            family=family, proto=proto, flags=flags,
1256                            reuse_address=reuse_address, reuse_port=reuse_port,
1257                            allow_broadcast=allow_broadcast)
1258                problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
1259                raise ValueError(
1260                    f'socket modifier keyword arguments can not be used '
1261                    f'when sock is specified. ({problems})')
1262            sock.setblocking(False)
1263            r_addr = None
1264        else:
1265            if not (local_addr or remote_addr):
1266                if family == 0:
1267                    raise ValueError('unexpected address family')
1268                addr_pairs_info = (((family, proto), (None, None)),)
1269            elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1270                for addr in (local_addr, remote_addr):
1271                    if addr is not None and not isinstance(addr, str):
1272                        raise TypeError('string is expected')
1273
1274                if local_addr and local_addr[0] not in (0, '\x00'):
1275                    try:
1276                        if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1277                            os.remove(local_addr)
1278                    except FileNotFoundError:
1279                        pass
1280                    except OSError as err:
1281                        # Directory may have permissions only to create socket.
1282                        logger.error('Unable to check or remove stale UNIX '
1283                                     'socket %r: %r',
1284                                     local_addr, err)
1285
1286                addr_pairs_info = (((family, proto),
1287                                    (local_addr, remote_addr)), )
1288            else:
1289                # join address by (family, protocol)
1290                addr_infos = {}  # Using order preserving dict
1291                for idx, addr in ((0, local_addr), (1, remote_addr)):
1292                    if addr is not None:
1293                        assert isinstance(addr, tuple) and len(addr) == 2, (
1294                            '2-tuple is expected')
1295
1296                        infos = await self._ensure_resolved(
1297                            addr, family=family, type=socket.SOCK_DGRAM,
1298                            proto=proto, flags=flags, loop=self)
1299                        if not infos:
1300                            raise OSError('getaddrinfo() returned empty list')
1301
1302                        for fam, _, pro, _, address in infos:
1303                            key = (fam, pro)
1304                            if key not in addr_infos:
1305                                addr_infos[key] = [None, None]
1306                            addr_infos[key][idx] = address
1307
1308                # each addr has to have info for each (family, proto) pair
1309                addr_pairs_info = [
1310                    (key, addr_pair) for key, addr_pair in addr_infos.items()
1311                    if not ((local_addr and addr_pair[0] is None) or
1312                            (remote_addr and addr_pair[1] is None))]
1313
1314                if not addr_pairs_info:
1315                    raise ValueError('can not get address information')
1316
1317            exceptions = []
1318
1319            # bpo-37228
1320            if reuse_address is not _unset:
1321                if reuse_address:
1322                    raise ValueError("Passing `reuse_address=True` is no "
1323                                     "longer supported, as the usage of "
1324                                     "SO_REUSEPORT in UDP poses a significant "
1325                                     "security concern.")
1326                else:
1327                    warnings.warn("The *reuse_address* parameter has been "
1328                                  "deprecated as of 3.5.10 and is scheduled "
1329                                  "for removal in 3.11.", DeprecationWarning,
1330                                  stacklevel=2)
1331
1332            for ((family, proto),
1333                 (local_address, remote_address)) in addr_pairs_info:
1334                sock = None
1335                r_addr = None
1336                try:
1337                    sock = socket.socket(
1338                        family=family, type=socket.SOCK_DGRAM, proto=proto)
1339                    if reuse_port:
1340                        _set_reuseport(sock)
1341                    if allow_broadcast:
1342                        sock.setsockopt(
1343                            socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1344                    sock.setblocking(False)
1345
1346                    if local_addr:
1347                        sock.bind(local_address)
1348                    if remote_addr:
1349                        if not allow_broadcast:
1350                            await self.sock_connect(sock, remote_address)
1351                        r_addr = remote_address
1352                except OSError as exc:
1353                    if sock is not None:
1354                        sock.close()
1355                    exceptions.append(exc)
1356                except:
1357                    if sock is not None:
1358                        sock.close()
1359                    raise
1360                else:
1361                    break
1362            else:
1363                raise exceptions[0]
1364
1365        protocol = protocol_factory()
1366        waiter = self.create_future()
1367        transport = self._make_datagram_transport(
1368            sock, protocol, r_addr, waiter)
1369        if self._debug:
1370            if local_addr:
1371                logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1372                            "created: (%r, %r)",
1373                            local_addr, remote_addr, transport, protocol)
1374            else:
1375                logger.debug("Datagram endpoint remote_addr=%r created: "
1376                             "(%r, %r)",
1377                             remote_addr, transport, protocol)
1378
1379        try:
1380            await waiter
1381        except:
1382            transport.close()
1383            raise
1384
1385        return transport, protocol
1386
1387    async def _ensure_resolved(self, address, *,
1388                               family=0, type=socket.SOCK_STREAM,
1389                               proto=0, flags=0, loop):
1390        host, port = address[:2]
1391        info = _ipaddr_info(host, port, family, type, proto, *address[2:])
1392        if info is not None:
1393            # "host" is already a resolved IP.
1394            return [info]
1395        else:
1396            return await loop.getaddrinfo(host, port, family=family, type=type,
1397                                          proto=proto, flags=flags)
1398
1399    async def _create_server_getaddrinfo(self, host, port, family, flags):
1400        infos = await self._ensure_resolved((host, port), family=family,
1401                                            type=socket.SOCK_STREAM,
1402                                            flags=flags, loop=self)
1403        if not infos:
1404            raise OSError(f'getaddrinfo({host!r}) returned empty list')
1405        return infos
1406
1407    async def create_server(
1408            self, protocol_factory, host=None, port=None,
1409            *,
1410            family=socket.AF_UNSPEC,
1411            flags=socket.AI_PASSIVE,
1412            sock=None,
1413            backlog=100,
1414            ssl=None,
1415            reuse_address=None,
1416            reuse_port=None,
1417            ssl_handshake_timeout=None,
1418            start_serving=True):
1419        """Create a TCP server.
1420
1421        The host parameter can be a string, in that case the TCP server is
1422        bound to host and port.
1423
1424        The host parameter can also be a sequence of strings and in that case
1425        the TCP server is bound to all hosts of the sequence. If a host
1426        appears multiple times (possibly indirectly e.g. when hostnames
1427        resolve to the same IP address), the server is only bound once to that
1428        host.
1429
1430        Return a Server object which can be used to stop the service.
1431
1432        This method is a coroutine.
1433        """
1434        if isinstance(ssl, bool):
1435            raise TypeError('ssl argument must be an SSLContext or None')
1436
1437        if ssl_handshake_timeout is not None and ssl is None:
1438            raise ValueError(
1439                'ssl_handshake_timeout is only meaningful with ssl')
1440
1441        if host is not None or port is not None:
1442            if sock is not None:
1443                raise ValueError(
1444                    'host/port and sock can not be specified at the same time')
1445
1446            if reuse_address is None:
1447                reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1448            sockets = []
1449            if host == '':
1450                hosts = [None]
1451            elif (isinstance(host, str) or
1452                  not isinstance(host, collections.abc.Iterable)):
1453                hosts = [host]
1454            else:
1455                hosts = host
1456
1457            fs = [self._create_server_getaddrinfo(host, port, family=family,
1458                                                  flags=flags)
1459                  for host in hosts]
1460            infos = await tasks.gather(*fs, loop=self)
1461            infos = set(itertools.chain.from_iterable(infos))
1462
1463            completed = False
1464            try:
1465                for res in infos:
1466                    af, socktype, proto, canonname, sa = res
1467                    try:
1468                        sock = socket.socket(af, socktype, proto)
1469                    except socket.error:
1470                        # Assume it's a bad family/type/protocol combination.
1471                        if self._debug:
1472                            logger.warning('create_server() failed to create '
1473                                           'socket.socket(%r, %r, %r)',
1474                                           af, socktype, proto, exc_info=True)
1475                        continue
1476                    sockets.append(sock)
1477                    if reuse_address:
1478                        sock.setsockopt(
1479                            socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1480                    if reuse_port:
1481                        _set_reuseport(sock)
1482                    # Disable IPv4/IPv6 dual stack support (enabled by
1483                    # default on Linux) which makes a single socket
1484                    # listen on both address families.
1485                    if (_HAS_IPv6 and
1486                            af == socket.AF_INET6 and
1487                            hasattr(socket, 'IPPROTO_IPV6')):
1488                        sock.setsockopt(socket.IPPROTO_IPV6,
1489                                        socket.IPV6_V6ONLY,
1490                                        True)
1491                    try:
1492                        sock.bind(sa)
1493                    except OSError as err:
1494                        raise OSError(err.errno, 'error while attempting '
1495                                      'to bind on address %r: %s'
1496                                      % (sa, err.strerror.lower())) from None
1497                completed = True
1498            finally:
1499                if not completed:
1500                    for sock in sockets:
1501                        sock.close()
1502        else:
1503            if sock is None:
1504                raise ValueError('Neither host/port nor sock were specified')
1505            if sock.type != socket.SOCK_STREAM:
1506                raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1507            sockets = [sock]
1508
1509        for sock in sockets:
1510            sock.setblocking(False)
1511
1512        server = Server(self, sockets, protocol_factory,
1513                        ssl, backlog, ssl_handshake_timeout)
1514        if start_serving:
1515            server._start_serving()
1516            # Skip one loop iteration so that all 'loop.add_reader'
1517            # go through.
1518            await tasks.sleep(0, loop=self)
1519
1520        if self._debug:
1521            logger.info("%r is serving", server)
1522        return server
1523
1524    async def connect_accepted_socket(
1525            self, protocol_factory, sock,
1526            *, ssl=None,
1527            ssl_handshake_timeout=None):
1528        """Handle an accepted connection.
1529
1530        This is used by servers that accept connections outside of
1531        asyncio but that use asyncio to handle connections.
1532
1533        This method is a coroutine.  When completed, the coroutine
1534        returns a (transport, protocol) pair.
1535        """
1536        if sock.type != socket.SOCK_STREAM:
1537            raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1538
1539        if ssl_handshake_timeout is not None and not ssl:
1540            raise ValueError(
1541                'ssl_handshake_timeout is only meaningful with ssl')
1542
1543        transport, protocol = await self._create_connection_transport(
1544            sock, protocol_factory, ssl, '', server_side=True,
1545            ssl_handshake_timeout=ssl_handshake_timeout)
1546        if self._debug:
1547            # Get the socket from the transport because SSL transport closes
1548            # the old socket and creates a new SSL socket
1549            sock = transport.get_extra_info('socket')
1550            logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1551        return transport, protocol
1552
1553    async def connect_read_pipe(self, protocol_factory, pipe):
1554        protocol = protocol_factory()
1555        waiter = self.create_future()
1556        transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1557
1558        try:
1559            await waiter
1560        except:
1561            transport.close()
1562            raise
1563
1564        if self._debug:
1565            logger.debug('Read pipe %r connected: (%r, %r)',
1566                         pipe.fileno(), transport, protocol)
1567        return transport, protocol
1568
1569    async def connect_write_pipe(self, protocol_factory, pipe):
1570        protocol = protocol_factory()
1571        waiter = self.create_future()
1572        transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1573
1574        try:
1575            await waiter
1576        except:
1577            transport.close()
1578            raise
1579
1580        if self._debug:
1581            logger.debug('Write pipe %r connected: (%r, %r)',
1582                         pipe.fileno(), transport, protocol)
1583        return transport, protocol
1584
1585    def _log_subprocess(self, msg, stdin, stdout, stderr):
1586        info = [msg]
1587        if stdin is not None:
1588            info.append(f'stdin={_format_pipe(stdin)}')
1589        if stdout is not None and stderr == subprocess.STDOUT:
1590            info.append(f'stdout=stderr={_format_pipe(stdout)}')
1591        else:
1592            if stdout is not None:
1593                info.append(f'stdout={_format_pipe(stdout)}')
1594            if stderr is not None:
1595                info.append(f'stderr={_format_pipe(stderr)}')
1596        logger.debug(' '.join(info))
1597
1598    async def subprocess_shell(self, protocol_factory, cmd, *,
1599                               stdin=subprocess.PIPE,
1600                               stdout=subprocess.PIPE,
1601                               stderr=subprocess.PIPE,
1602                               universal_newlines=False,
1603                               shell=True, bufsize=0,
1604                               encoding=None, errors=None, text=None,
1605                               **kwargs):
1606        if not isinstance(cmd, (bytes, str)):
1607            raise ValueError("cmd must be a string")
1608        if universal_newlines:
1609            raise ValueError("universal_newlines must be False")
1610        if not shell:
1611            raise ValueError("shell must be True")
1612        if bufsize != 0:
1613            raise ValueError("bufsize must be 0")
1614        if text:
1615            raise ValueError("text must be False")
1616        if encoding is not None:
1617            raise ValueError("encoding must be None")
1618        if errors is not None:
1619            raise ValueError("errors must be None")
1620
1621        protocol = protocol_factory()
1622        debug_log = None
1623        if self._debug:
1624            # don't log parameters: they may contain sensitive information
1625            # (password) and may be too long
1626            debug_log = 'run shell command %r' % cmd
1627            self._log_subprocess(debug_log, stdin, stdout, stderr)
1628        transport = await self._make_subprocess_transport(
1629            protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1630        if self._debug and debug_log is not None:
1631            logger.info('%s: %r', debug_log, transport)
1632        return transport, protocol
1633
1634    async def subprocess_exec(self, protocol_factory, program, *args,
1635                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1636                              stderr=subprocess.PIPE, universal_newlines=False,
1637                              shell=False, bufsize=0,
1638                              encoding=None, errors=None, text=None,
1639                              **kwargs):
1640        if universal_newlines:
1641            raise ValueError("universal_newlines must be False")
1642        if shell:
1643            raise ValueError("shell must be False")
1644        if bufsize != 0:
1645            raise ValueError("bufsize must be 0")
1646        if text:
1647            raise ValueError("text must be False")
1648        if encoding is not None:
1649            raise ValueError("encoding must be None")
1650        if errors is not None:
1651            raise ValueError("errors must be None")
1652
1653        popen_args = (program,) + args
1654        protocol = protocol_factory()
1655        debug_log = None
1656        if self._debug:
1657            # don't log parameters: they may contain sensitive information
1658            # (password) and may be too long
1659            debug_log = f'execute program {program!r}'
1660            self._log_subprocess(debug_log, stdin, stdout, stderr)
1661        transport = await self._make_subprocess_transport(
1662            protocol, popen_args, False, stdin, stdout, stderr,
1663            bufsize, **kwargs)
1664        if self._debug and debug_log is not None:
1665            logger.info('%s: %r', debug_log, transport)
1666        return transport, protocol
1667
1668    def get_exception_handler(self):
1669        """Return an exception handler, or None if the default one is in use.
1670        """
1671        return self._exception_handler
1672
1673    def set_exception_handler(self, handler):
1674        """Set handler as the new event loop exception handler.
1675
1676        If handler is None, the default exception handler will
1677        be set.
1678
1679        If handler is a callable object, it should have a
1680        signature matching '(loop, context)', where 'loop'
1681        will be a reference to the active event loop, 'context'
1682        will be a dict object (see `call_exception_handler()`
1683        documentation for details about context).
1684        """
1685        if handler is not None and not callable(handler):
1686            raise TypeError(f'A callable object or None is expected, '
1687                            f'got {handler!r}')
1688        self._exception_handler = handler
1689
1690    def default_exception_handler(self, context):
1691        """Default exception handler.
1692
1693        This is called when an exception occurs and no exception
1694        handler is set, and can be called by a custom exception
1695        handler that wants to defer to the default behavior.
1696
1697        This default handler logs the error message and other
1698        context-dependent information.  In debug mode, a truncated
1699        stack trace is also appended showing where the given object
1700        (e.g. a handle or future or task) was created, if any.
1701
1702        The context parameter has the same meaning as in
1703        `call_exception_handler()`.
1704        """
1705        message = context.get('message')
1706        if not message:
1707            message = 'Unhandled exception in event loop'
1708
1709        exception = context.get('exception')
1710        if exception is not None:
1711            exc_info = (type(exception), exception, exception.__traceback__)
1712        else:
1713            exc_info = False
1714
1715        if ('source_traceback' not in context and
1716                self._current_handle is not None and
1717                self._current_handle._source_traceback):
1718            context['handle_traceback'] = \
1719                self._current_handle._source_traceback
1720
1721        log_lines = [message]
1722        for key in sorted(context):
1723            if key in {'message', 'exception'}:
1724                continue
1725            value = context[key]
1726            if key == 'source_traceback':
1727                tb = ''.join(traceback.format_list(value))
1728                value = 'Object created at (most recent call last):\n'
1729                value += tb.rstrip()
1730            elif key == 'handle_traceback':
1731                tb = ''.join(traceback.format_list(value))
1732                value = 'Handle created at (most recent call last):\n'
1733                value += tb.rstrip()
1734            else:
1735                value = repr(value)
1736            log_lines.append(f'{key}: {value}')
1737
1738        logger.error('\n'.join(log_lines), exc_info=exc_info)
1739
1740    def call_exception_handler(self, context):
1741        """Call the current event loop's exception handler.
1742
1743        The context argument is a dict containing the following keys:
1744
1745        - 'message': Error message;
1746        - 'exception' (optional): Exception object;
1747        - 'future' (optional): Future instance;
1748        - 'task' (optional): Task instance;
1749        - 'handle' (optional): Handle instance;
1750        - 'protocol' (optional): Protocol instance;
1751        - 'transport' (optional): Transport instance;
1752        - 'socket' (optional): Socket instance;
1753        - 'asyncgen' (optional): Asynchronous generator that caused
1754                                 the exception.
1755
1756        New keys maybe introduced in the future.
1757
1758        Note: do not overload this method in an event loop subclass.
1759        For custom exception handling, use the
1760        `set_exception_handler()` method.
1761        """
1762        if self._exception_handler is None:
1763            try:
1764                self.default_exception_handler(context)
1765            except (SystemExit, KeyboardInterrupt):
1766                raise
1767            except BaseException:
1768                # Second protection layer for unexpected errors
1769                # in the default implementation, as well as for subclassed
1770                # event loops with overloaded "default_exception_handler".
1771                logger.error('Exception in default exception handler',
1772                             exc_info=True)
1773        else:
1774            try:
1775                self._exception_handler(self, context)
1776            except (SystemExit, KeyboardInterrupt):
1777                raise
1778            except BaseException as exc:
1779                # Exception in the user set custom exception handler.
1780                try:
1781                    # Let's try default handler.
1782                    self.default_exception_handler({
1783                        'message': 'Unhandled error in exception handler',
1784                        'exception': exc,
1785                        'context': context,
1786                    })
1787                except (SystemExit, KeyboardInterrupt):
1788                    raise
1789                except BaseException:
1790                    # Guard 'default_exception_handler' in case it is
1791                    # overloaded.
1792                    logger.error('Exception in default exception handler '
1793                                 'while handling an unexpected error '
1794                                 'in custom exception handler',
1795                                 exc_info=True)
1796
1797    def _add_callback(self, handle):
1798        """Add a Handle to _scheduled (TimerHandle) or _ready."""
1799        assert isinstance(handle, events.Handle), 'A Handle is required here'
1800        if handle._cancelled:
1801            return
1802        assert not isinstance(handle, events.TimerHandle)
1803        self._ready.append(handle)
1804
1805    def _add_callback_signalsafe(self, handle):
1806        """Like _add_callback() but called from a signal handler."""
1807        self._add_callback(handle)
1808        self._write_to_self()
1809
1810    def _timer_handle_cancelled(self, handle):
1811        """Notification that a TimerHandle has been cancelled."""
1812        if handle._scheduled:
1813            self._timer_cancelled_count += 1
1814
1815    def _run_once(self):
1816        """Run one full iteration of the event loop.
1817
1818        This calls all currently ready callbacks, polls for I/O,
1819        schedules the resulting callbacks, and finally schedules
1820        'call_later' callbacks.
1821        """
1822
1823        sched_count = len(self._scheduled)
1824        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1825            self._timer_cancelled_count / sched_count >
1826                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1827            # Remove delayed calls that were cancelled if their number
1828            # is too high
1829            new_scheduled = []
1830            for handle in self._scheduled:
1831                if handle._cancelled:
1832                    handle._scheduled = False
1833                else:
1834                    new_scheduled.append(handle)
1835
1836            heapq.heapify(new_scheduled)
1837            self._scheduled = new_scheduled
1838            self._timer_cancelled_count = 0
1839        else:
1840            # Remove delayed calls that were cancelled from head of queue.
1841            while self._scheduled and self._scheduled[0]._cancelled:
1842                self._timer_cancelled_count -= 1
1843                handle = heapq.heappop(self._scheduled)
1844                handle._scheduled = False
1845
1846        timeout = None
1847        if self._ready or self._stopping:
1848            timeout = 0
1849        elif self._scheduled:
1850            # Compute the desired timeout.
1851            when = self._scheduled[0]._when
1852            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
1853
1854        event_list = self._selector.select(timeout)
1855        self._process_events(event_list)
1856
1857        # Handle 'later' callbacks that are ready.
1858        end_time = self.time() + self._clock_resolution
1859        while self._scheduled:
1860            handle = self._scheduled[0]
1861            if handle._when >= end_time:
1862                break
1863            handle = heapq.heappop(self._scheduled)
1864            handle._scheduled = False
1865            self._ready.append(handle)
1866
1867        # This is the only place where callbacks are actually *called*.
1868        # All other places just add them to ready.
1869        # Note: We run all currently scheduled callbacks, but not any
1870        # callbacks scheduled by callbacks run this time around --
1871        # they will be run the next time (after another I/O poll).
1872        # Use an idiom that is thread-safe without using locks.
1873        ntodo = len(self._ready)
1874        for i in range(ntodo):
1875            handle = self._ready.popleft()
1876            if handle._cancelled:
1877                continue
1878            if self._debug:
1879                try:
1880                    self._current_handle = handle
1881                    t0 = self.time()
1882                    handle._run()
1883                    dt = self.time() - t0
1884                    if dt >= self.slow_callback_duration:
1885                        logger.warning('Executing %s took %.3f seconds',
1886                                       _format_handle(handle), dt)
1887                finally:
1888                    self._current_handle = None
1889            else:
1890                handle._run()
1891        handle = None  # Needed to break cycles when an exception occurs.
1892
1893    def _set_coroutine_origin_tracking(self, enabled):
1894        if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
1895            return
1896
1897        if enabled:
1898            self._coroutine_origin_tracking_saved_depth = (
1899                sys.get_coroutine_origin_tracking_depth())
1900            sys.set_coroutine_origin_tracking_depth(
1901                constants.DEBUG_STACK_DEPTH)
1902        else:
1903            sys.set_coroutine_origin_tracking_depth(
1904                self._coroutine_origin_tracking_saved_depth)
1905
1906        self._coroutine_origin_tracking_enabled = enabled
1907
1908    def get_debug(self):
1909        return self._debug
1910
1911    def set_debug(self, enabled):
1912        self._debug = enabled
1913
1914        if self.is_running():
1915            self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)
1916