• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of I/O events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called.  This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16import collections
17import collections.abc
18import concurrent.futures
19import functools
20import heapq
21import itertools
22import os
23import socket
24import stat
25import subprocess
26import threading
27import time
28import traceback
29import sys
30import warnings
31import weakref
32
33try:
34    import ssl
35except ImportError:  # pragma: no cover
36    ssl = None
37
38from . import constants
39from . import coroutines
40from . import events
41from . import exceptions
42from . import futures
43from . import protocols
44from . import sslproto
45from . import staggered
46from . import tasks
47from . import transports
48from . import trsock
49from .log import logger
50
51
52__all__ = 'BaseEventLoop','Server',
53
54
55# Minimum number of _scheduled timer handles before cleanup of
56# cancelled handles is performed.
57_MIN_SCHEDULED_TIMER_HANDLES = 100
58
59# Minimum fraction of _scheduled timer handles that are cancelled
60# before cleanup of cancelled handles is performed.
61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
62
63
64_HAS_IPv6 = hasattr(socket, 'AF_INET6')
65
66# Maximum timeout passed to select to avoid OS limitations
67MAXIMUM_SELECT_TIMEOUT = 24 * 3600
68
69# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s
70# *reuse_address* parameter
71_unset = object()
72
73
74def _format_handle(handle):
75    cb = handle._callback
76    if isinstance(getattr(cb, '__self__', None), tasks.Task):
77        # format the task
78        return repr(cb.__self__)
79    else:
80        return str(handle)
81
82
83def _format_pipe(fd):
84    if fd == subprocess.PIPE:
85        return '<pipe>'
86    elif fd == subprocess.STDOUT:
87        return '<stdout>'
88    else:
89        return repr(fd)
90
91
92def _set_reuseport(sock):
93    if not hasattr(socket, 'SO_REUSEPORT'):
94        raise ValueError('reuse_port not supported by socket module')
95    else:
96        try:
97            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
98        except OSError:
99            raise ValueError('reuse_port not supported by socket module, '
100                             'SO_REUSEPORT defined but not implemented.')
101
102
103def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
104    # Try to skip getaddrinfo if "host" is already an IP. Users might have
105    # handled name resolution in their own code and pass in resolved IPs.
106    if not hasattr(socket, 'inet_pton'):
107        return
108
109    if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
110            host is None:
111        return None
112
113    if type == socket.SOCK_STREAM:
114        proto = socket.IPPROTO_TCP
115    elif type == socket.SOCK_DGRAM:
116        proto = socket.IPPROTO_UDP
117    else:
118        return None
119
120    if port is None:
121        port = 0
122    elif isinstance(port, bytes) and port == b'':
123        port = 0
124    elif isinstance(port, str) and port == '':
125        port = 0
126    else:
127        # If port's a service name like "http", don't skip getaddrinfo.
128        try:
129            port = int(port)
130        except (TypeError, ValueError):
131            return None
132
133    if family == socket.AF_UNSPEC:
134        afs = [socket.AF_INET]
135        if _HAS_IPv6:
136            afs.append(socket.AF_INET6)
137    else:
138        afs = [family]
139
140    if isinstance(host, bytes):
141        host = host.decode('idna')
142    if '%' in host:
143        # Linux's inet_pton doesn't accept an IPv6 zone index after host,
144        # like '::1%lo0'.
145        return None
146
147    for af in afs:
148        try:
149            socket.inet_pton(af, host)
150            # The host has already been resolved.
151            if _HAS_IPv6 and af == socket.AF_INET6:
152                return af, type, proto, '', (host, port, flowinfo, scopeid)
153            else:
154                return af, type, proto, '', (host, port)
155        except OSError:
156            pass
157
158    # "host" is not an IP address.
159    return None
160
161
162def _interleave_addrinfos(addrinfos, first_address_family_count=1):
163    """Interleave list of addrinfo tuples by family."""
164    # Group addresses by family
165    addrinfos_by_family = collections.OrderedDict()
166    for addr in addrinfos:
167        family = addr[0]
168        if family not in addrinfos_by_family:
169            addrinfos_by_family[family] = []
170        addrinfos_by_family[family].append(addr)
171    addrinfos_lists = list(addrinfos_by_family.values())
172
173    reordered = []
174    if first_address_family_count > 1:
175        reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
176        del addrinfos_lists[0][:first_address_family_count - 1]
177    reordered.extend(
178        a for a in itertools.chain.from_iterable(
179            itertools.zip_longest(*addrinfos_lists)
180        ) if a is not None)
181    return reordered
182
183
184def _run_until_complete_cb(fut):
185    if not fut.cancelled():
186        exc = fut.exception()
187        if isinstance(exc, (SystemExit, KeyboardInterrupt)):
188            # Issue #22429: run_forever() already finished, no need to
189            # stop it.
190            return
191    futures._get_loop(fut).stop()
192
193
194if hasattr(socket, 'TCP_NODELAY'):
195    def _set_nodelay(sock):
196        if (sock.family in {socket.AF_INET, socket.AF_INET6} and
197                sock.type == socket.SOCK_STREAM and
198                sock.proto == socket.IPPROTO_TCP):
199            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
200else:
201    def _set_nodelay(sock):
202        pass
203
204
205def _check_ssl_socket(sock):
206    if ssl is not None and isinstance(sock, ssl.SSLSocket):
207        raise TypeError("Socket cannot be of type SSLSocket")
208
209
210class _SendfileFallbackProtocol(protocols.Protocol):
211    def __init__(self, transp):
212        if not isinstance(transp, transports._FlowControlMixin):
213            raise TypeError("transport should be _FlowControlMixin instance")
214        self._transport = transp
215        self._proto = transp.get_protocol()
216        self._should_resume_reading = transp.is_reading()
217        self._should_resume_writing = transp._protocol_paused
218        transp.pause_reading()
219        transp.set_protocol(self)
220        if self._should_resume_writing:
221            self._write_ready_fut = self._transport._loop.create_future()
222        else:
223            self._write_ready_fut = None
224
225    async def drain(self):
226        if self._transport.is_closing():
227            raise ConnectionError("Connection closed by peer")
228        fut = self._write_ready_fut
229        if fut is None:
230            return
231        await fut
232
233    def connection_made(self, transport):
234        raise RuntimeError("Invalid state: "
235                           "connection should have been established already.")
236
237    def connection_lost(self, exc):
238        if self._write_ready_fut is not None:
239            # Never happens if peer disconnects after sending the whole content
240            # Thus disconnection is always an exception from user perspective
241            if exc is None:
242                self._write_ready_fut.set_exception(
243                    ConnectionError("Connection is closed by peer"))
244            else:
245                self._write_ready_fut.set_exception(exc)
246        self._proto.connection_lost(exc)
247
248    def pause_writing(self):
249        if self._write_ready_fut is not None:
250            return
251        self._write_ready_fut = self._transport._loop.create_future()
252
253    def resume_writing(self):
254        if self._write_ready_fut is None:
255            return
256        self._write_ready_fut.set_result(False)
257        self._write_ready_fut = None
258
259    def data_received(self, data):
260        raise RuntimeError("Invalid state: reading should be paused")
261
262    def eof_received(self):
263        raise RuntimeError("Invalid state: reading should be paused")
264
265    async def restore(self):
266        self._transport.set_protocol(self._proto)
267        if self._should_resume_reading:
268            self._transport.resume_reading()
269        if self._write_ready_fut is not None:
270            # Cancel the future.
271            # Basically it has no effect because protocol is switched back,
272            # no code should wait for it anymore.
273            self._write_ready_fut.cancel()
274        if self._should_resume_writing:
275            self._proto.resume_writing()
276
277
278class Server(events.AbstractServer):
279
280    def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
281                 ssl_handshake_timeout):
282        self._loop = loop
283        self._sockets = sockets
284        self._active_count = 0
285        self._waiters = []
286        self._protocol_factory = protocol_factory
287        self._backlog = backlog
288        self._ssl_context = ssl_context
289        self._ssl_handshake_timeout = ssl_handshake_timeout
290        self._serving = False
291        self._serving_forever_fut = None
292
293    def __repr__(self):
294        return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
295
296    def _attach(self):
297        assert self._sockets is not None
298        self._active_count += 1
299
300    def _detach(self):
301        assert self._active_count > 0
302        self._active_count -= 1
303        if self._active_count == 0 and self._sockets is None:
304            self._wakeup()
305
306    def _wakeup(self):
307        waiters = self._waiters
308        self._waiters = None
309        for waiter in waiters:
310            if not waiter.done():
311                waiter.set_result(waiter)
312
313    def _start_serving(self):
314        if self._serving:
315            return
316        self._serving = True
317        for sock in self._sockets:
318            sock.listen(self._backlog)
319            self._loop._start_serving(
320                self._protocol_factory, sock, self._ssl_context,
321                self, self._backlog, self._ssl_handshake_timeout)
322
323    def get_loop(self):
324        return self._loop
325
326    def is_serving(self):
327        return self._serving
328
329    @property
330    def sockets(self):
331        if self._sockets is None:
332            return ()
333        return tuple(trsock.TransportSocket(s) for s in self._sockets)
334
335    def close(self):
336        sockets = self._sockets
337        if sockets is None:
338            return
339        self._sockets = None
340
341        for sock in sockets:
342            self._loop._stop_serving(sock)
343
344        self._serving = False
345
346        if (self._serving_forever_fut is not None and
347                not self._serving_forever_fut.done()):
348            self._serving_forever_fut.cancel()
349            self._serving_forever_fut = None
350
351        if self._active_count == 0:
352            self._wakeup()
353
354    async def start_serving(self):
355        self._start_serving()
356        # Skip one loop iteration so that all 'loop.add_reader'
357        # go through.
358        await tasks.sleep(0)
359
360    async def serve_forever(self):
361        if self._serving_forever_fut is not None:
362            raise RuntimeError(
363                f'server {self!r} is already being awaited on serve_forever()')
364        if self._sockets is None:
365            raise RuntimeError(f'server {self!r} is closed')
366
367        self._start_serving()
368        self._serving_forever_fut = self._loop.create_future()
369
370        try:
371            await self._serving_forever_fut
372        except exceptions.CancelledError:
373            try:
374                self.close()
375                await self.wait_closed()
376            finally:
377                raise
378        finally:
379            self._serving_forever_fut = None
380
381    async def wait_closed(self):
382        if self._sockets is None or self._waiters is None:
383            return
384        waiter = self._loop.create_future()
385        self._waiters.append(waiter)
386        await waiter
387
388
389class BaseEventLoop(events.AbstractEventLoop):
390
391    def __init__(self):
392        self._timer_cancelled_count = 0
393        self._closed = False
394        self._stopping = False
395        self._ready = collections.deque()
396        self._scheduled = []
397        self._default_executor = None
398        self._internal_fds = 0
399        # Identifier of the thread running the event loop, or None if the
400        # event loop is not running
401        self._thread_id = None
402        self._clock_resolution = time.get_clock_info('monotonic').resolution
403        self._exception_handler = None
404        self.set_debug(coroutines._is_debug_mode())
405        # In debug mode, if the execution of a callback or a step of a task
406        # exceed this duration in seconds, the slow callback/task is logged.
407        self.slow_callback_duration = 0.1
408        self._current_handle = None
409        self._task_factory = None
410        self._coroutine_origin_tracking_enabled = False
411        self._coroutine_origin_tracking_saved_depth = None
412
413        # A weak set of all asynchronous generators that are
414        # being iterated by the loop.
415        self._asyncgens = weakref.WeakSet()
416        # Set to True when `loop.shutdown_asyncgens` is called.
417        self._asyncgens_shutdown_called = False
418        # Set to True when `loop.shutdown_default_executor` is called.
419        self._executor_shutdown_called = False
420
421    def __repr__(self):
422        return (
423            f'<{self.__class__.__name__} running={self.is_running()} '
424            f'closed={self.is_closed()} debug={self.get_debug()}>'
425        )
426
427    def create_future(self):
428        """Create a Future object attached to the loop."""
429        return futures.Future(loop=self)
430
431    def create_task(self, coro, *, name=None):
432        """Schedule a coroutine object.
433
434        Return a task object.
435        """
436        self._check_closed()
437        if self._task_factory is None:
438            task = tasks.Task(coro, loop=self, name=name)
439            if task._source_traceback:
440                del task._source_traceback[-1]
441        else:
442            task = self._task_factory(self, coro)
443            tasks._set_task_name(task, name)
444
445        return task
446
447    def set_task_factory(self, factory):
448        """Set a task factory that will be used by loop.create_task().
449
450        If factory is None the default task factory will be set.
451
452        If factory is a callable, it should have a signature matching
453        '(loop, coro)', where 'loop' will be a reference to the active
454        event loop, 'coro' will be a coroutine object.  The callable
455        must return a Future.
456        """
457        if factory is not None and not callable(factory):
458            raise TypeError('task factory must be a callable or None')
459        self._task_factory = factory
460
461    def get_task_factory(self):
462        """Return a task factory, or None if the default one is in use."""
463        return self._task_factory
464
465    def _make_socket_transport(self, sock, protocol, waiter=None, *,
466                               extra=None, server=None):
467        """Create socket transport."""
468        raise NotImplementedError
469
470    def _make_ssl_transport(
471            self, rawsock, protocol, sslcontext, waiter=None,
472            *, server_side=False, server_hostname=None,
473            extra=None, server=None,
474            ssl_handshake_timeout=None,
475            call_connection_made=True):
476        """Create SSL transport."""
477        raise NotImplementedError
478
479    def _make_datagram_transport(self, sock, protocol,
480                                 address=None, waiter=None, extra=None):
481        """Create datagram transport."""
482        raise NotImplementedError
483
484    def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
485                                  extra=None):
486        """Create read pipe transport."""
487        raise NotImplementedError
488
489    def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
490                                   extra=None):
491        """Create write pipe transport."""
492        raise NotImplementedError
493
494    async def _make_subprocess_transport(self, protocol, args, shell,
495                                         stdin, stdout, stderr, bufsize,
496                                         extra=None, **kwargs):
497        """Create subprocess transport."""
498        raise NotImplementedError
499
500    def _write_to_self(self):
501        """Write a byte to self-pipe, to wake up the event loop.
502
503        This may be called from a different thread.
504
505        The subclass is responsible for implementing the self-pipe.
506        """
507        raise NotImplementedError
508
509    def _process_events(self, event_list):
510        """Process selector events."""
511        raise NotImplementedError
512
513    def _check_closed(self):
514        if self._closed:
515            raise RuntimeError('Event loop is closed')
516
517    def _check_default_executor(self):
518        if self._executor_shutdown_called:
519            raise RuntimeError('Executor shutdown has been called')
520
521    def _asyncgen_finalizer_hook(self, agen):
522        self._asyncgens.discard(agen)
523        if not self.is_closed():
524            self.call_soon_threadsafe(self.create_task, agen.aclose())
525
526    def _asyncgen_firstiter_hook(self, agen):
527        if self._asyncgens_shutdown_called:
528            warnings.warn(
529                f"asynchronous generator {agen!r} was scheduled after "
530                f"loop.shutdown_asyncgens() call",
531                ResourceWarning, source=self)
532
533        self._asyncgens.add(agen)
534
535    async def shutdown_asyncgens(self):
536        """Shutdown all active asynchronous generators."""
537        self._asyncgens_shutdown_called = True
538
539        if not len(self._asyncgens):
540            # If Python version is <3.6 or we don't have any asynchronous
541            # generators alive.
542            return
543
544        closing_agens = list(self._asyncgens)
545        self._asyncgens.clear()
546
547        results = await tasks.gather(
548            *[ag.aclose() for ag in closing_agens],
549            return_exceptions=True)
550
551        for result, agen in zip(results, closing_agens):
552            if isinstance(result, Exception):
553                self.call_exception_handler({
554                    'message': f'an error occurred during closing of '
555                               f'asynchronous generator {agen!r}',
556                    'exception': result,
557                    'asyncgen': agen
558                })
559
560    async def shutdown_default_executor(self):
561        """Schedule the shutdown of the default executor."""
562        self._executor_shutdown_called = True
563        if self._default_executor is None:
564            return
565        future = self.create_future()
566        thread = threading.Thread(target=self._do_shutdown, args=(future,))
567        thread.start()
568        try:
569            await future
570        finally:
571            thread.join()
572
573    def _do_shutdown(self, future):
574        try:
575            self._default_executor.shutdown(wait=True)
576            self.call_soon_threadsafe(future.set_result, None)
577        except Exception as ex:
578            self.call_soon_threadsafe(future.set_exception, ex)
579
580    def _check_running(self):
581        if self.is_running():
582            raise RuntimeError('This event loop is already running')
583        if events._get_running_loop() is not None:
584            raise RuntimeError(
585                'Cannot run the event loop while another loop is running')
586
587    def run_forever(self):
588        """Run until stop() is called."""
589        self._check_closed()
590        self._check_running()
591        self._set_coroutine_origin_tracking(self._debug)
592        self._thread_id = threading.get_ident()
593
594        old_agen_hooks = sys.get_asyncgen_hooks()
595        sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
596                               finalizer=self._asyncgen_finalizer_hook)
597        try:
598            events._set_running_loop(self)
599            while True:
600                self._run_once()
601                if self._stopping:
602                    break
603        finally:
604            self._stopping = False
605            self._thread_id = None
606            events._set_running_loop(None)
607            self._set_coroutine_origin_tracking(False)
608            sys.set_asyncgen_hooks(*old_agen_hooks)
609
610    def run_until_complete(self, future):
611        """Run until the Future is done.
612
613        If the argument is a coroutine, it is wrapped in a Task.
614
615        WARNING: It would be disastrous to call run_until_complete()
616        with the same coroutine twice -- it would wrap it in two
617        different Tasks and that can't be good.
618
619        Return the Future's result, or raise its exception.
620        """
621        self._check_closed()
622        self._check_running()
623
624        new_task = not futures.isfuture(future)
625        future = tasks.ensure_future(future, loop=self)
626        if new_task:
627            # An exception is raised if the future didn't complete, so there
628            # is no need to log the "destroy pending task" message
629            future._log_destroy_pending = False
630
631        future.add_done_callback(_run_until_complete_cb)
632        try:
633            self.run_forever()
634        except:
635            if new_task and future.done() and not future.cancelled():
636                # The coroutine raised a BaseException. Consume the exception
637                # to not log a warning, the caller doesn't have access to the
638                # local task.
639                future.exception()
640            raise
641        finally:
642            future.remove_done_callback(_run_until_complete_cb)
643        if not future.done():
644            raise RuntimeError('Event loop stopped before Future completed.')
645
646        return future.result()
647
648    def stop(self):
649        """Stop running the event loop.
650
651        Every callback already scheduled will still run.  This simply informs
652        run_forever to stop looping after a complete iteration.
653        """
654        self._stopping = True
655
656    def close(self):
657        """Close the event loop.
658
659        This clears the queues and shuts down the executor,
660        but does not wait for the executor to finish.
661
662        The event loop must not be running.
663        """
664        if self.is_running():
665            raise RuntimeError("Cannot close a running event loop")
666        if self._closed:
667            return
668        if self._debug:
669            logger.debug("Close %r", self)
670        self._closed = True
671        self._ready.clear()
672        self._scheduled.clear()
673        self._executor_shutdown_called = True
674        executor = self._default_executor
675        if executor is not None:
676            self._default_executor = None
677            executor.shutdown(wait=False)
678
679    def is_closed(self):
680        """Returns True if the event loop was closed."""
681        return self._closed
682
683    def __del__(self, _warn=warnings.warn):
684        if not self.is_closed():
685            _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
686            if not self.is_running():
687                self.close()
688
689    def is_running(self):
690        """Returns True if the event loop is running."""
691        return (self._thread_id is not None)
692
693    def time(self):
694        """Return the time according to the event loop's clock.
695
696        This is a float expressed in seconds since an epoch, but the
697        epoch, precision, accuracy and drift are unspecified and may
698        differ per event loop.
699        """
700        return time.monotonic()
701
702    def call_later(self, delay, callback, *args, context=None):
703        """Arrange for a callback to be called at a given time.
704
705        Return a Handle: an opaque object with a cancel() method that
706        can be used to cancel the call.
707
708        The delay can be an int or float, expressed in seconds.  It is
709        always relative to the current time.
710
711        Each callback will be called exactly once.  If two callbacks
712        are scheduled for exactly the same time, it undefined which
713        will be called first.
714
715        Any positional arguments after the callback will be passed to
716        the callback when it is called.
717        """
718        timer = self.call_at(self.time() + delay, callback, *args,
719                             context=context)
720        if timer._source_traceback:
721            del timer._source_traceback[-1]
722        return timer
723
724    def call_at(self, when, callback, *args, context=None):
725        """Like call_later(), but uses an absolute time.
726
727        Absolute time corresponds to the event loop's time() method.
728        """
729        self._check_closed()
730        if self._debug:
731            self._check_thread()
732            self._check_callback(callback, 'call_at')
733        timer = events.TimerHandle(when, callback, args, self, context)
734        if timer._source_traceback:
735            del timer._source_traceback[-1]
736        heapq.heappush(self._scheduled, timer)
737        timer._scheduled = True
738        return timer
739
740    def call_soon(self, callback, *args, context=None):
741        """Arrange for a callback to be called as soon as possible.
742
743        This operates as a FIFO queue: callbacks are called in the
744        order in which they are registered.  Each callback will be
745        called exactly once.
746
747        Any positional arguments after the callback will be passed to
748        the callback when it is called.
749        """
750        self._check_closed()
751        if self._debug:
752            self._check_thread()
753            self._check_callback(callback, 'call_soon')
754        handle = self._call_soon(callback, args, context)
755        if handle._source_traceback:
756            del handle._source_traceback[-1]
757        return handle
758
759    def _check_callback(self, callback, method):
760        if (coroutines.iscoroutine(callback) or
761                coroutines.iscoroutinefunction(callback)):
762            raise TypeError(
763                f"coroutines cannot be used with {method}()")
764        if not callable(callback):
765            raise TypeError(
766                f'a callable object was expected by {method}(), '
767                f'got {callback!r}')
768
769    def _call_soon(self, callback, args, context):
770        handle = events.Handle(callback, args, self, context)
771        if handle._source_traceback:
772            del handle._source_traceback[-1]
773        self._ready.append(handle)
774        return handle
775
776    def _check_thread(self):
777        """Check that the current thread is the thread running the event loop.
778
779        Non-thread-safe methods of this class make this assumption and will
780        likely behave incorrectly when the assumption is violated.
781
782        Should only be called when (self._debug == True).  The caller is
783        responsible for checking this condition for performance reasons.
784        """
785        if self._thread_id is None:
786            return
787        thread_id = threading.get_ident()
788        if thread_id != self._thread_id:
789            raise RuntimeError(
790                "Non-thread-safe operation invoked on an event loop other "
791                "than the current one")
792
793    def call_soon_threadsafe(self, callback, *args, context=None):
794        """Like call_soon(), but thread-safe."""
795        self._check_closed()
796        if self._debug:
797            self._check_callback(callback, 'call_soon_threadsafe')
798        handle = self._call_soon(callback, args, context)
799        if handle._source_traceback:
800            del handle._source_traceback[-1]
801        self._write_to_self()
802        return handle
803
804    def run_in_executor(self, executor, func, *args):
805        self._check_closed()
806        if self._debug:
807            self._check_callback(func, 'run_in_executor')
808        if executor is None:
809            executor = self._default_executor
810            # Only check when the default executor is being used
811            self._check_default_executor()
812            if executor is None:
813                executor = concurrent.futures.ThreadPoolExecutor(
814                    thread_name_prefix='asyncio'
815                )
816                self._default_executor = executor
817        return futures.wrap_future(
818            executor.submit(func, *args), loop=self)
819
820    def set_default_executor(self, executor):
821        if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
822            warnings.warn(
823                'Using the default executor that is not an instance of '
824                'ThreadPoolExecutor is deprecated and will be prohibited '
825                'in Python 3.9',
826                DeprecationWarning, 2)
827        self._default_executor = executor
828
829    def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
830        msg = [f"{host}:{port!r}"]
831        if family:
832            msg.append(f'family={family!r}')
833        if type:
834            msg.append(f'type={type!r}')
835        if proto:
836            msg.append(f'proto={proto!r}')
837        if flags:
838            msg.append(f'flags={flags!r}')
839        msg = ', '.join(msg)
840        logger.debug('Get address info %s', msg)
841
842        t0 = self.time()
843        addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
844        dt = self.time() - t0
845
846        msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
847        if dt >= self.slow_callback_duration:
848            logger.info(msg)
849        else:
850            logger.debug(msg)
851        return addrinfo
852
853    async def getaddrinfo(self, host, port, *,
854                          family=0, type=0, proto=0, flags=0):
855        if self._debug:
856            getaddr_func = self._getaddrinfo_debug
857        else:
858            getaddr_func = socket.getaddrinfo
859
860        return await self.run_in_executor(
861            None, getaddr_func, host, port, family, type, proto, flags)
862
863    async def getnameinfo(self, sockaddr, flags=0):
864        return await self.run_in_executor(
865            None, socket.getnameinfo, sockaddr, flags)
866
867    async def sock_sendfile(self, sock, file, offset=0, count=None,
868                            *, fallback=True):
869        if self._debug and sock.gettimeout() != 0:
870            raise ValueError("the socket must be non-blocking")
871        _check_ssl_socket(sock)
872        self._check_sendfile_params(sock, file, offset, count)
873        try:
874            return await self._sock_sendfile_native(sock, file,
875                                                    offset, count)
876        except exceptions.SendfileNotAvailableError as exc:
877            if not fallback:
878                raise
879        return await self._sock_sendfile_fallback(sock, file,
880                                                  offset, count)
881
882    async def _sock_sendfile_native(self, sock, file, offset, count):
883        # NB: sendfile syscall is not supported for SSL sockets and
884        # non-mmap files even if sendfile is supported by OS
885        raise exceptions.SendfileNotAvailableError(
886            f"syscall sendfile is not available for socket {sock!r} "
887            "and file {file!r} combination")
888
889    async def _sock_sendfile_fallback(self, sock, file, offset, count):
890        if offset:
891            file.seek(offset)
892        blocksize = (
893            min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
894            if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
895        )
896        buf = bytearray(blocksize)
897        total_sent = 0
898        try:
899            while True:
900                if count:
901                    blocksize = min(count - total_sent, blocksize)
902                    if blocksize <= 0:
903                        break
904                view = memoryview(buf)[:blocksize]
905                read = await self.run_in_executor(None, file.readinto, view)
906                if not read:
907                    break  # EOF
908                await self.sock_sendall(sock, view[:read])
909                total_sent += read
910            return total_sent
911        finally:
912            if total_sent > 0 and hasattr(file, 'seek'):
913                file.seek(offset + total_sent)
914
915    def _check_sendfile_params(self, sock, file, offset, count):
916        if 'b' not in getattr(file, 'mode', 'b'):
917            raise ValueError("file should be opened in binary mode")
918        if not sock.type == socket.SOCK_STREAM:
919            raise ValueError("only SOCK_STREAM type sockets are supported")
920        if count is not None:
921            if not isinstance(count, int):
922                raise TypeError(
923                    "count must be a positive integer (got {!r})".format(count))
924            if count <= 0:
925                raise ValueError(
926                    "count must be a positive integer (got {!r})".format(count))
927        if not isinstance(offset, int):
928            raise TypeError(
929                "offset must be a non-negative integer (got {!r})".format(
930                    offset))
931        if offset < 0:
932            raise ValueError(
933                "offset must be a non-negative integer (got {!r})".format(
934                    offset))
935
936    async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
937        """Create, bind and connect one socket."""
938        my_exceptions = []
939        exceptions.append(my_exceptions)
940        family, type_, proto, _, address = addr_info
941        sock = None
942        try:
943            sock = socket.socket(family=family, type=type_, proto=proto)
944            sock.setblocking(False)
945            if local_addr_infos is not None:
946                for _, _, _, _, laddr in local_addr_infos:
947                    try:
948                        sock.bind(laddr)
949                        break
950                    except OSError as exc:
951                        msg = (
952                            f'error while attempting to bind on '
953                            f'address {laddr!r}: '
954                            f'{exc.strerror.lower()}'
955                        )
956                        exc = OSError(exc.errno, msg)
957                        my_exceptions.append(exc)
958                else:  # all bind attempts failed
959                    raise my_exceptions.pop()
960            await self.sock_connect(sock, address)
961            return sock
962        except OSError as exc:
963            my_exceptions.append(exc)
964            if sock is not None:
965                sock.close()
966            raise
967        except:
968            if sock is not None:
969                sock.close()
970            raise
971
972    async def create_connection(
973            self, protocol_factory, host=None, port=None,
974            *, ssl=None, family=0,
975            proto=0, flags=0, sock=None,
976            local_addr=None, server_hostname=None,
977            ssl_handshake_timeout=None,
978            happy_eyeballs_delay=None, interleave=None):
979        """Connect to a TCP server.
980
981        Create a streaming transport connection to a given internet host and
982        port: socket family AF_INET or socket.AF_INET6 depending on host (or
983        family if specified), socket type SOCK_STREAM. protocol_factory must be
984        a callable returning a protocol instance.
985
986        This method is a coroutine which will try to establish the connection
987        in the background.  When successful, the coroutine returns a
988        (transport, protocol) pair.
989        """
990        if server_hostname is not None and not ssl:
991            raise ValueError('server_hostname is only meaningful with ssl')
992
993        if server_hostname is None and ssl:
994            # Use host as default for server_hostname.  It is an error
995            # if host is empty or not set, e.g. when an
996            # already-connected socket was passed or when only a port
997            # is given.  To avoid this error, you can pass
998            # server_hostname='' -- this will bypass the hostname
999            # check.  (This also means that if host is a numeric
1000            # IP/IPv6 address, we will attempt to verify that exact
1001            # address; this will probably fail, but it is possible to
1002            # create a certificate for a specific IP address, so we
1003            # don't judge it here.)
1004            if not host:
1005                raise ValueError('You must set server_hostname '
1006                                 'when using ssl without a host')
1007            server_hostname = host
1008
1009        if ssl_handshake_timeout is not None and not ssl:
1010            raise ValueError(
1011                'ssl_handshake_timeout is only meaningful with ssl')
1012
1013        if sock is not None:
1014            _check_ssl_socket(sock)
1015
1016        if happy_eyeballs_delay is not None and interleave is None:
1017            # If using happy eyeballs, default to interleave addresses by family
1018            interleave = 1
1019
1020        if host is not None or port is not None:
1021            if sock is not None:
1022                raise ValueError(
1023                    'host/port and sock can not be specified at the same time')
1024
1025            infos = await self._ensure_resolved(
1026                (host, port), family=family,
1027                type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
1028            if not infos:
1029                raise OSError('getaddrinfo() returned empty list')
1030
1031            if local_addr is not None:
1032                laddr_infos = await self._ensure_resolved(
1033                    local_addr, family=family,
1034                    type=socket.SOCK_STREAM, proto=proto,
1035                    flags=flags, loop=self)
1036                if not laddr_infos:
1037                    raise OSError('getaddrinfo() returned empty list')
1038            else:
1039                laddr_infos = None
1040
1041            if interleave:
1042                infos = _interleave_addrinfos(infos, interleave)
1043
1044            exceptions = []
1045            if happy_eyeballs_delay is None:
1046                # not using happy eyeballs
1047                for addrinfo in infos:
1048                    try:
1049                        sock = await self._connect_sock(
1050                            exceptions, addrinfo, laddr_infos)
1051                        break
1052                    except OSError:
1053                        continue
1054            else:  # using happy eyeballs
1055                sock, _, _ = await staggered.staggered_race(
1056                    (functools.partial(self._connect_sock,
1057                                       exceptions, addrinfo, laddr_infos)
1058                     for addrinfo in infos),
1059                    happy_eyeballs_delay, loop=self)
1060
1061            if sock is None:
1062                exceptions = [exc for sub in exceptions for exc in sub]
1063                if len(exceptions) == 1:
1064                    raise exceptions[0]
1065                else:
1066                    # If they all have the same str(), raise one.
1067                    model = str(exceptions[0])
1068                    if all(str(exc) == model for exc in exceptions):
1069                        raise exceptions[0]
1070                    # Raise a combined exception so the user can see all
1071                    # the various error messages.
1072                    raise OSError('Multiple exceptions: {}'.format(
1073                        ', '.join(str(exc) for exc in exceptions)))
1074
1075        else:
1076            if sock is None:
1077                raise ValueError(
1078                    'host and port was not specified and no sock specified')
1079            if sock.type != socket.SOCK_STREAM:
1080                # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1081                # are SOCK_STREAM.
1082                # We support passing AF_UNIX sockets even though we have
1083                # a dedicated API for that: create_unix_connection.
1084                # Disallowing AF_UNIX in this method, breaks backwards
1085                # compatibility.
1086                raise ValueError(
1087                    f'A Stream Socket was expected, got {sock!r}')
1088
1089        transport, protocol = await self._create_connection_transport(
1090            sock, protocol_factory, ssl, server_hostname,
1091            ssl_handshake_timeout=ssl_handshake_timeout)
1092        if self._debug:
1093            # Get the socket from the transport because SSL transport closes
1094            # the old socket and creates a new SSL socket
1095            sock = transport.get_extra_info('socket')
1096            logger.debug("%r connected to %s:%r: (%r, %r)",
1097                         sock, host, port, transport, protocol)
1098        return transport, protocol
1099
1100    async def _create_connection_transport(
1101            self, sock, protocol_factory, ssl,
1102            server_hostname, server_side=False,
1103            ssl_handshake_timeout=None):
1104
1105        sock.setblocking(False)
1106
1107        protocol = protocol_factory()
1108        waiter = self.create_future()
1109        if ssl:
1110            sslcontext = None if isinstance(ssl, bool) else ssl
1111            transport = self._make_ssl_transport(
1112                sock, protocol, sslcontext, waiter,
1113                server_side=server_side, server_hostname=server_hostname,
1114                ssl_handshake_timeout=ssl_handshake_timeout)
1115        else:
1116            transport = self._make_socket_transport(sock, protocol, waiter)
1117
1118        try:
1119            await waiter
1120        except:
1121            transport.close()
1122            raise
1123
1124        return transport, protocol
1125
1126    async def sendfile(self, transport, file, offset=0, count=None,
1127                       *, fallback=True):
1128        """Send a file to transport.
1129
1130        Return the total number of bytes which were sent.
1131
1132        The method uses high-performance os.sendfile if available.
1133
1134        file must be a regular file object opened in binary mode.
1135
1136        offset tells from where to start reading the file. If specified,
1137        count is the total number of bytes to transmit as opposed to
1138        sending the file until EOF is reached. File position is updated on
1139        return or also in case of error in which case file.tell()
1140        can be used to figure out the number of bytes
1141        which were sent.
1142
1143        fallback set to True makes asyncio to manually read and send
1144        the file when the platform does not support the sendfile syscall
1145        (e.g. Windows or SSL socket on Unix).
1146
1147        Raise SendfileNotAvailableError if the system does not support
1148        sendfile syscall and fallback is False.
1149        """
1150        if transport.is_closing():
1151            raise RuntimeError("Transport is closing")
1152        mode = getattr(transport, '_sendfile_compatible',
1153                       constants._SendfileMode.UNSUPPORTED)
1154        if mode is constants._SendfileMode.UNSUPPORTED:
1155            raise RuntimeError(
1156                f"sendfile is not supported for transport {transport!r}")
1157        if mode is constants._SendfileMode.TRY_NATIVE:
1158            try:
1159                return await self._sendfile_native(transport, file,
1160                                                   offset, count)
1161            except exceptions.SendfileNotAvailableError as exc:
1162                if not fallback:
1163                    raise
1164
1165        if not fallback:
1166            raise RuntimeError(
1167                f"fallback is disabled and native sendfile is not "
1168                f"supported for transport {transport!r}")
1169
1170        return await self._sendfile_fallback(transport, file,
1171                                             offset, count)
1172
1173    async def _sendfile_native(self, transp, file, offset, count):
1174        raise exceptions.SendfileNotAvailableError(
1175            "sendfile syscall is not supported")
1176
1177    async def _sendfile_fallback(self, transp, file, offset, count):
1178        if offset:
1179            file.seek(offset)
1180        blocksize = min(count, 16384) if count else 16384
1181        buf = bytearray(blocksize)
1182        total_sent = 0
1183        proto = _SendfileFallbackProtocol(transp)
1184        try:
1185            while True:
1186                if count:
1187                    blocksize = min(count - total_sent, blocksize)
1188                    if blocksize <= 0:
1189                        return total_sent
1190                view = memoryview(buf)[:blocksize]
1191                read = await self.run_in_executor(None, file.readinto, view)
1192                if not read:
1193                    return total_sent  # EOF
1194                await proto.drain()
1195                transp.write(view[:read])
1196                total_sent += read
1197        finally:
1198            if total_sent > 0 and hasattr(file, 'seek'):
1199                file.seek(offset + total_sent)
1200            await proto.restore()
1201
1202    async def start_tls(self, transport, protocol, sslcontext, *,
1203                        server_side=False,
1204                        server_hostname=None,
1205                        ssl_handshake_timeout=None):
1206        """Upgrade transport to TLS.
1207
1208        Return a new transport that *protocol* should start using
1209        immediately.
1210        """
1211        if ssl is None:
1212            raise RuntimeError('Python ssl module is not available')
1213
1214        if not isinstance(sslcontext, ssl.SSLContext):
1215            raise TypeError(
1216                f'sslcontext is expected to be an instance of ssl.SSLContext, '
1217                f'got {sslcontext!r}')
1218
1219        if not getattr(transport, '_start_tls_compatible', False):
1220            raise TypeError(
1221                f'transport {transport!r} is not supported by start_tls()')
1222
1223        waiter = self.create_future()
1224        ssl_protocol = sslproto.SSLProtocol(
1225            self, protocol, sslcontext, waiter,
1226            server_side, server_hostname,
1227            ssl_handshake_timeout=ssl_handshake_timeout,
1228            call_connection_made=False)
1229
1230        # Pause early so that "ssl_protocol.data_received()" doesn't
1231        # have a chance to get called before "ssl_protocol.connection_made()".
1232        transport.pause_reading()
1233
1234        transport.set_protocol(ssl_protocol)
1235        conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1236        resume_cb = self.call_soon(transport.resume_reading)
1237
1238        try:
1239            await waiter
1240        except BaseException:
1241            transport.close()
1242            conmade_cb.cancel()
1243            resume_cb.cancel()
1244            raise
1245
1246        return ssl_protocol._app_transport
1247
1248    async def create_datagram_endpoint(self, protocol_factory,
1249                                       local_addr=None, remote_addr=None, *,
1250                                       family=0, proto=0, flags=0,
1251                                       reuse_address=_unset, reuse_port=None,
1252                                       allow_broadcast=None, sock=None):
1253        """Create datagram connection."""
1254        if sock is not None:
1255            if sock.type != socket.SOCK_DGRAM:
1256                raise ValueError(
1257                    f'A UDP Socket was expected, got {sock!r}')
1258            if (local_addr or remote_addr or
1259                    family or proto or flags or
1260                    reuse_port or allow_broadcast):
1261                # show the problematic kwargs in exception msg
1262                opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1263                            family=family, proto=proto, flags=flags,
1264                            reuse_address=reuse_address, reuse_port=reuse_port,
1265                            allow_broadcast=allow_broadcast)
1266                problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
1267                raise ValueError(
1268                    f'socket modifier keyword arguments can not be used '
1269                    f'when sock is specified. ({problems})')
1270            sock.setblocking(False)
1271            r_addr = None
1272        else:
1273            if not (local_addr or remote_addr):
1274                if family == 0:
1275                    raise ValueError('unexpected address family')
1276                addr_pairs_info = (((family, proto), (None, None)),)
1277            elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1278                for addr in (local_addr, remote_addr):
1279                    if addr is not None and not isinstance(addr, str):
1280                        raise TypeError('string is expected')
1281
1282                if local_addr and local_addr[0] not in (0, '\x00'):
1283                    try:
1284                        if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1285                            os.remove(local_addr)
1286                    except FileNotFoundError:
1287                        pass
1288                    except OSError as err:
1289                        # Directory may have permissions only to create socket.
1290                        logger.error('Unable to check or remove stale UNIX '
1291                                     'socket %r: %r',
1292                                     local_addr, err)
1293
1294                addr_pairs_info = (((family, proto),
1295                                    (local_addr, remote_addr)), )
1296            else:
1297                # join address by (family, protocol)
1298                addr_infos = {}  # Using order preserving dict
1299                for idx, addr in ((0, local_addr), (1, remote_addr)):
1300                    if addr is not None:
1301                        assert isinstance(addr, tuple) and len(addr) == 2, (
1302                            '2-tuple is expected')
1303
1304                        infos = await self._ensure_resolved(
1305                            addr, family=family, type=socket.SOCK_DGRAM,
1306                            proto=proto, flags=flags, loop=self)
1307                        if not infos:
1308                            raise OSError('getaddrinfo() returned empty list')
1309
1310                        for fam, _, pro, _, address in infos:
1311                            key = (fam, pro)
1312                            if key not in addr_infos:
1313                                addr_infos[key] = [None, None]
1314                            addr_infos[key][idx] = address
1315
1316                # each addr has to have info for each (family, proto) pair
1317                addr_pairs_info = [
1318                    (key, addr_pair) for key, addr_pair in addr_infos.items()
1319                    if not ((local_addr and addr_pair[0] is None) or
1320                            (remote_addr and addr_pair[1] is None))]
1321
1322                if not addr_pairs_info:
1323                    raise ValueError('can not get address information')
1324
1325            exceptions = []
1326
1327            # bpo-37228
1328            if reuse_address is not _unset:
1329                if reuse_address:
1330                    raise ValueError("Passing `reuse_address=True` is no "
1331                                     "longer supported, as the usage of "
1332                                     "SO_REUSEPORT in UDP poses a significant "
1333                                     "security concern.")
1334                else:
1335                    warnings.warn("The *reuse_address* parameter has been "
1336                                  "deprecated as of 3.5.10 and is scheduled "
1337                                  "for removal in 3.11.", DeprecationWarning,
1338                                  stacklevel=2)
1339
1340            for ((family, proto),
1341                 (local_address, remote_address)) in addr_pairs_info:
1342                sock = None
1343                r_addr = None
1344                try:
1345                    sock = socket.socket(
1346                        family=family, type=socket.SOCK_DGRAM, proto=proto)
1347                    if reuse_port:
1348                        _set_reuseport(sock)
1349                    if allow_broadcast:
1350                        sock.setsockopt(
1351                            socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1352                    sock.setblocking(False)
1353
1354                    if local_addr:
1355                        sock.bind(local_address)
1356                    if remote_addr:
1357                        if not allow_broadcast:
1358                            await self.sock_connect(sock, remote_address)
1359                        r_addr = remote_address
1360                except OSError as exc:
1361                    if sock is not None:
1362                        sock.close()
1363                    exceptions.append(exc)
1364                except:
1365                    if sock is not None:
1366                        sock.close()
1367                    raise
1368                else:
1369                    break
1370            else:
1371                raise exceptions[0]
1372
1373        protocol = protocol_factory()
1374        waiter = self.create_future()
1375        transport = self._make_datagram_transport(
1376            sock, protocol, r_addr, waiter)
1377        if self._debug:
1378            if local_addr:
1379                logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1380                            "created: (%r, %r)",
1381                            local_addr, remote_addr, transport, protocol)
1382            else:
1383                logger.debug("Datagram endpoint remote_addr=%r created: "
1384                             "(%r, %r)",
1385                             remote_addr, transport, protocol)
1386
1387        try:
1388            await waiter
1389        except:
1390            transport.close()
1391            raise
1392
1393        return transport, protocol
1394
1395    async def _ensure_resolved(self, address, *,
1396                               family=0, type=socket.SOCK_STREAM,
1397                               proto=0, flags=0, loop):
1398        host, port = address[:2]
1399        info = _ipaddr_info(host, port, family, type, proto, *address[2:])
1400        if info is not None:
1401            # "host" is already a resolved IP.
1402            return [info]
1403        else:
1404            return await loop.getaddrinfo(host, port, family=family, type=type,
1405                                          proto=proto, flags=flags)
1406
1407    async def _create_server_getaddrinfo(self, host, port, family, flags):
1408        infos = await self._ensure_resolved((host, port), family=family,
1409                                            type=socket.SOCK_STREAM,
1410                                            flags=flags, loop=self)
1411        if not infos:
1412            raise OSError(f'getaddrinfo({host!r}) returned empty list')
1413        return infos
1414
1415    async def create_server(
1416            self, protocol_factory, host=None, port=None,
1417            *,
1418            family=socket.AF_UNSPEC,
1419            flags=socket.AI_PASSIVE,
1420            sock=None,
1421            backlog=100,
1422            ssl=None,
1423            reuse_address=None,
1424            reuse_port=None,
1425            ssl_handshake_timeout=None,
1426            start_serving=True):
1427        """Create a TCP server.
1428
1429        The host parameter can be a string, in that case the TCP server is
1430        bound to host and port.
1431
1432        The host parameter can also be a sequence of strings and in that case
1433        the TCP server is bound to all hosts of the sequence. If a host
1434        appears multiple times (possibly indirectly e.g. when hostnames
1435        resolve to the same IP address), the server is only bound once to that
1436        host.
1437
1438        Return a Server object which can be used to stop the service.
1439
1440        This method is a coroutine.
1441        """
1442        if isinstance(ssl, bool):
1443            raise TypeError('ssl argument must be an SSLContext or None')
1444
1445        if ssl_handshake_timeout is not None and ssl is None:
1446            raise ValueError(
1447                'ssl_handshake_timeout is only meaningful with ssl')
1448
1449        if sock is not None:
1450            _check_ssl_socket(sock)
1451
1452        if host is not None or port is not None:
1453            if sock is not None:
1454                raise ValueError(
1455                    'host/port and sock can not be specified at the same time')
1456
1457            if reuse_address is None:
1458                reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1459            sockets = []
1460            if host == '':
1461                hosts = [None]
1462            elif (isinstance(host, str) or
1463                  not isinstance(host, collections.abc.Iterable)):
1464                hosts = [host]
1465            else:
1466                hosts = host
1467
1468            fs = [self._create_server_getaddrinfo(host, port, family=family,
1469                                                  flags=flags)
1470                  for host in hosts]
1471            infos = await tasks.gather(*fs)
1472            infos = set(itertools.chain.from_iterable(infos))
1473
1474            completed = False
1475            try:
1476                for res in infos:
1477                    af, socktype, proto, canonname, sa = res
1478                    try:
1479                        sock = socket.socket(af, socktype, proto)
1480                    except socket.error:
1481                        # Assume it's a bad family/type/protocol combination.
1482                        if self._debug:
1483                            logger.warning('create_server() failed to create '
1484                                           'socket.socket(%r, %r, %r)',
1485                                           af, socktype, proto, exc_info=True)
1486                        continue
1487                    sockets.append(sock)
1488                    if reuse_address:
1489                        sock.setsockopt(
1490                            socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1491                    if reuse_port:
1492                        _set_reuseport(sock)
1493                    # Disable IPv4/IPv6 dual stack support (enabled by
1494                    # default on Linux) which makes a single socket
1495                    # listen on both address families.
1496                    if (_HAS_IPv6 and
1497                            af == socket.AF_INET6 and
1498                            hasattr(socket, 'IPPROTO_IPV6')):
1499                        sock.setsockopt(socket.IPPROTO_IPV6,
1500                                        socket.IPV6_V6ONLY,
1501                                        True)
1502                    try:
1503                        sock.bind(sa)
1504                    except OSError as err:
1505                        raise OSError(err.errno, 'error while attempting '
1506                                      'to bind on address %r: %s'
1507                                      % (sa, err.strerror.lower())) from None
1508                completed = True
1509            finally:
1510                if not completed:
1511                    for sock in sockets:
1512                        sock.close()
1513        else:
1514            if sock is None:
1515                raise ValueError('Neither host/port nor sock were specified')
1516            if sock.type != socket.SOCK_STREAM:
1517                raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1518            sockets = [sock]
1519
1520        for sock in sockets:
1521            sock.setblocking(False)
1522
1523        server = Server(self, sockets, protocol_factory,
1524                        ssl, backlog, ssl_handshake_timeout)
1525        if start_serving:
1526            server._start_serving()
1527            # Skip one loop iteration so that all 'loop.add_reader'
1528            # go through.
1529            await tasks.sleep(0)
1530
1531        if self._debug:
1532            logger.info("%r is serving", server)
1533        return server
1534
1535    async def connect_accepted_socket(
1536            self, protocol_factory, sock,
1537            *, ssl=None,
1538            ssl_handshake_timeout=None):
1539        if sock.type != socket.SOCK_STREAM:
1540            raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1541
1542        if ssl_handshake_timeout is not None and not ssl:
1543            raise ValueError(
1544                'ssl_handshake_timeout is only meaningful with ssl')
1545
1546        if sock is not None:
1547            _check_ssl_socket(sock)
1548
1549        transport, protocol = await self._create_connection_transport(
1550            sock, protocol_factory, ssl, '', server_side=True,
1551            ssl_handshake_timeout=ssl_handshake_timeout)
1552        if self._debug:
1553            # Get the socket from the transport because SSL transport closes
1554            # the old socket and creates a new SSL socket
1555            sock = transport.get_extra_info('socket')
1556            logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1557        return transport, protocol
1558
1559    async def connect_read_pipe(self, protocol_factory, pipe):
1560        protocol = protocol_factory()
1561        waiter = self.create_future()
1562        transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1563
1564        try:
1565            await waiter
1566        except:
1567            transport.close()
1568            raise
1569
1570        if self._debug:
1571            logger.debug('Read pipe %r connected: (%r, %r)',
1572                         pipe.fileno(), transport, protocol)
1573        return transport, protocol
1574
1575    async def connect_write_pipe(self, protocol_factory, pipe):
1576        protocol = protocol_factory()
1577        waiter = self.create_future()
1578        transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1579
1580        try:
1581            await waiter
1582        except:
1583            transport.close()
1584            raise
1585
1586        if self._debug:
1587            logger.debug('Write pipe %r connected: (%r, %r)',
1588                         pipe.fileno(), transport, protocol)
1589        return transport, protocol
1590
1591    def _log_subprocess(self, msg, stdin, stdout, stderr):
1592        info = [msg]
1593        if stdin is not None:
1594            info.append(f'stdin={_format_pipe(stdin)}')
1595        if stdout is not None and stderr == subprocess.STDOUT:
1596            info.append(f'stdout=stderr={_format_pipe(stdout)}')
1597        else:
1598            if stdout is not None:
1599                info.append(f'stdout={_format_pipe(stdout)}')
1600            if stderr is not None:
1601                info.append(f'stderr={_format_pipe(stderr)}')
1602        logger.debug(' '.join(info))
1603
1604    async def subprocess_shell(self, protocol_factory, cmd, *,
1605                               stdin=subprocess.PIPE,
1606                               stdout=subprocess.PIPE,
1607                               stderr=subprocess.PIPE,
1608                               universal_newlines=False,
1609                               shell=True, bufsize=0,
1610                               encoding=None, errors=None, text=None,
1611                               **kwargs):
1612        if not isinstance(cmd, (bytes, str)):
1613            raise ValueError("cmd must be a string")
1614        if universal_newlines:
1615            raise ValueError("universal_newlines must be False")
1616        if not shell:
1617            raise ValueError("shell must be True")
1618        if bufsize != 0:
1619            raise ValueError("bufsize must be 0")
1620        if text:
1621            raise ValueError("text must be False")
1622        if encoding is not None:
1623            raise ValueError("encoding must be None")
1624        if errors is not None:
1625            raise ValueError("errors must be None")
1626
1627        protocol = protocol_factory()
1628        debug_log = None
1629        if self._debug:
1630            # don't log parameters: they may contain sensitive information
1631            # (password) and may be too long
1632            debug_log = 'run shell command %r' % cmd
1633            self._log_subprocess(debug_log, stdin, stdout, stderr)
1634        transport = await self._make_subprocess_transport(
1635            protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1636        if self._debug and debug_log is not None:
1637            logger.info('%s: %r', debug_log, transport)
1638        return transport, protocol
1639
1640    async def subprocess_exec(self, protocol_factory, program, *args,
1641                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1642                              stderr=subprocess.PIPE, universal_newlines=False,
1643                              shell=False, bufsize=0,
1644                              encoding=None, errors=None, text=None,
1645                              **kwargs):
1646        if universal_newlines:
1647            raise ValueError("universal_newlines must be False")
1648        if shell:
1649            raise ValueError("shell must be False")
1650        if bufsize != 0:
1651            raise ValueError("bufsize must be 0")
1652        if text:
1653            raise ValueError("text must be False")
1654        if encoding is not None:
1655            raise ValueError("encoding must be None")
1656        if errors is not None:
1657            raise ValueError("errors must be None")
1658
1659        popen_args = (program,) + args
1660        protocol = protocol_factory()
1661        debug_log = None
1662        if self._debug:
1663            # don't log parameters: they may contain sensitive information
1664            # (password) and may be too long
1665            debug_log = f'execute program {program!r}'
1666            self._log_subprocess(debug_log, stdin, stdout, stderr)
1667        transport = await self._make_subprocess_transport(
1668            protocol, popen_args, False, stdin, stdout, stderr,
1669            bufsize, **kwargs)
1670        if self._debug and debug_log is not None:
1671            logger.info('%s: %r', debug_log, transport)
1672        return transport, protocol
1673
1674    def get_exception_handler(self):
1675        """Return an exception handler, or None if the default one is in use.
1676        """
1677        return self._exception_handler
1678
1679    def set_exception_handler(self, handler):
1680        """Set handler as the new event loop exception handler.
1681
1682        If handler is None, the default exception handler will
1683        be set.
1684
1685        If handler is a callable object, it should have a
1686        signature matching '(loop, context)', where 'loop'
1687        will be a reference to the active event loop, 'context'
1688        will be a dict object (see `call_exception_handler()`
1689        documentation for details about context).
1690        """
1691        if handler is not None and not callable(handler):
1692            raise TypeError(f'A callable object or None is expected, '
1693                            f'got {handler!r}')
1694        self._exception_handler = handler
1695
1696    def default_exception_handler(self, context):
1697        """Default exception handler.
1698
1699        This is called when an exception occurs and no exception
1700        handler is set, and can be called by a custom exception
1701        handler that wants to defer to the default behavior.
1702
1703        This default handler logs the error message and other
1704        context-dependent information.  In debug mode, a truncated
1705        stack trace is also appended showing where the given object
1706        (e.g. a handle or future or task) was created, if any.
1707
1708        The context parameter has the same meaning as in
1709        `call_exception_handler()`.
1710        """
1711        message = context.get('message')
1712        if not message:
1713            message = 'Unhandled exception in event loop'
1714
1715        exception = context.get('exception')
1716        if exception is not None:
1717            exc_info = (type(exception), exception, exception.__traceback__)
1718        else:
1719            exc_info = False
1720
1721        if ('source_traceback' not in context and
1722                self._current_handle is not None and
1723                self._current_handle._source_traceback):
1724            context['handle_traceback'] = \
1725                self._current_handle._source_traceback
1726
1727        log_lines = [message]
1728        for key in sorted(context):
1729            if key in {'message', 'exception'}:
1730                continue
1731            value = context[key]
1732            if key == 'source_traceback':
1733                tb = ''.join(traceback.format_list(value))
1734                value = 'Object created at (most recent call last):\n'
1735                value += tb.rstrip()
1736            elif key == 'handle_traceback':
1737                tb = ''.join(traceback.format_list(value))
1738                value = 'Handle created at (most recent call last):\n'
1739                value += tb.rstrip()
1740            else:
1741                value = repr(value)
1742            log_lines.append(f'{key}: {value}')
1743
1744        logger.error('\n'.join(log_lines), exc_info=exc_info)
1745
1746    def call_exception_handler(self, context):
1747        """Call the current event loop's exception handler.
1748
1749        The context argument is a dict containing the following keys:
1750
1751        - 'message': Error message;
1752        - 'exception' (optional): Exception object;
1753        - 'future' (optional): Future instance;
1754        - 'task' (optional): Task instance;
1755        - 'handle' (optional): Handle instance;
1756        - 'protocol' (optional): Protocol instance;
1757        - 'transport' (optional): Transport instance;
1758        - 'socket' (optional): Socket instance;
1759        - 'asyncgen' (optional): Asynchronous generator that caused
1760                                 the exception.
1761
1762        New keys maybe introduced in the future.
1763
1764        Note: do not overload this method in an event loop subclass.
1765        For custom exception handling, use the
1766        `set_exception_handler()` method.
1767        """
1768        if self._exception_handler is None:
1769            try:
1770                self.default_exception_handler(context)
1771            except (SystemExit, KeyboardInterrupt):
1772                raise
1773            except BaseException:
1774                # Second protection layer for unexpected errors
1775                # in the default implementation, as well as for subclassed
1776                # event loops with overloaded "default_exception_handler".
1777                logger.error('Exception in default exception handler',
1778                             exc_info=True)
1779        else:
1780            try:
1781                self._exception_handler(self, context)
1782            except (SystemExit, KeyboardInterrupt):
1783                raise
1784            except BaseException as exc:
1785                # Exception in the user set custom exception handler.
1786                try:
1787                    # Let's try default handler.
1788                    self.default_exception_handler({
1789                        'message': 'Unhandled error in exception handler',
1790                        'exception': exc,
1791                        'context': context,
1792                    })
1793                except (SystemExit, KeyboardInterrupt):
1794                    raise
1795                except BaseException:
1796                    # Guard 'default_exception_handler' in case it is
1797                    # overloaded.
1798                    logger.error('Exception in default exception handler '
1799                                 'while handling an unexpected error '
1800                                 'in custom exception handler',
1801                                 exc_info=True)
1802
1803    def _add_callback(self, handle):
1804        """Add a Handle to _scheduled (TimerHandle) or _ready."""
1805        assert isinstance(handle, events.Handle), 'A Handle is required here'
1806        if handle._cancelled:
1807            return
1808        assert not isinstance(handle, events.TimerHandle)
1809        self._ready.append(handle)
1810
1811    def _add_callback_signalsafe(self, handle):
1812        """Like _add_callback() but called from a signal handler."""
1813        self._add_callback(handle)
1814        self._write_to_self()
1815
1816    def _timer_handle_cancelled(self, handle):
1817        """Notification that a TimerHandle has been cancelled."""
1818        if handle._scheduled:
1819            self._timer_cancelled_count += 1
1820
1821    def _run_once(self):
1822        """Run one full iteration of the event loop.
1823
1824        This calls all currently ready callbacks, polls for I/O,
1825        schedules the resulting callbacks, and finally schedules
1826        'call_later' callbacks.
1827        """
1828
1829        sched_count = len(self._scheduled)
1830        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1831            self._timer_cancelled_count / sched_count >
1832                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1833            # Remove delayed calls that were cancelled if their number
1834            # is too high
1835            new_scheduled = []
1836            for handle in self._scheduled:
1837                if handle._cancelled:
1838                    handle._scheduled = False
1839                else:
1840                    new_scheduled.append(handle)
1841
1842            heapq.heapify(new_scheduled)
1843            self._scheduled = new_scheduled
1844            self._timer_cancelled_count = 0
1845        else:
1846            # Remove delayed calls that were cancelled from head of queue.
1847            while self._scheduled and self._scheduled[0]._cancelled:
1848                self._timer_cancelled_count -= 1
1849                handle = heapq.heappop(self._scheduled)
1850                handle._scheduled = False
1851
1852        timeout = None
1853        if self._ready or self._stopping:
1854            timeout = 0
1855        elif self._scheduled:
1856            # Compute the desired timeout.
1857            when = self._scheduled[0]._when
1858            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
1859
1860        event_list = self._selector.select(timeout)
1861        self._process_events(event_list)
1862
1863        # Handle 'later' callbacks that are ready.
1864        end_time = self.time() + self._clock_resolution
1865        while self._scheduled:
1866            handle = self._scheduled[0]
1867            if handle._when >= end_time:
1868                break
1869            handle = heapq.heappop(self._scheduled)
1870            handle._scheduled = False
1871            self._ready.append(handle)
1872
1873        # This is the only place where callbacks are actually *called*.
1874        # All other places just add them to ready.
1875        # Note: We run all currently scheduled callbacks, but not any
1876        # callbacks scheduled by callbacks run this time around --
1877        # they will be run the next time (after another I/O poll).
1878        # Use an idiom that is thread-safe without using locks.
1879        ntodo = len(self._ready)
1880        for i in range(ntodo):
1881            handle = self._ready.popleft()
1882            if handle._cancelled:
1883                continue
1884            if self._debug:
1885                try:
1886                    self._current_handle = handle
1887                    t0 = self.time()
1888                    handle._run()
1889                    dt = self.time() - t0
1890                    if dt >= self.slow_callback_duration:
1891                        logger.warning('Executing %s took %.3f seconds',
1892                                       _format_handle(handle), dt)
1893                finally:
1894                    self._current_handle = None
1895            else:
1896                handle._run()
1897        handle = None  # Needed to break cycles when an exception occurs.
1898
1899    def _set_coroutine_origin_tracking(self, enabled):
1900        if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
1901            return
1902
1903        if enabled:
1904            self._coroutine_origin_tracking_saved_depth = (
1905                sys.get_coroutine_origin_tracking_depth())
1906            sys.set_coroutine_origin_tracking_depth(
1907                constants.DEBUG_STACK_DEPTH)
1908        else:
1909            sys.set_coroutine_origin_tracking_depth(
1910                self._coroutine_origin_tracking_saved_depth)
1911
1912        self._coroutine_origin_tracking_enabled = enabled
1913
1914    def get_debug(self):
1915        return self._debug
1916
1917    def set_debug(self, enabled):
1918        self._debug = enabled
1919
1920        if self.is_running():
1921            self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)
1922