• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of I/O events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called.  This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16import collections
17import collections.abc
18import concurrent.futures
19import errno
20import heapq
21import itertools
22import os
23import socket
24import stat
25import subprocess
26import threading
27import time
28import traceback
29import sys
30import warnings
31import weakref
32
33try:
34    import ssl
35except ImportError:  # pragma: no cover
36    ssl = None
37
38from . import constants
39from . import coroutines
40from . import events
41from . import exceptions
42from . import futures
43from . import protocols
44from . import sslproto
45from . import staggered
46from . import tasks
47from . import timeouts
48from . import transports
49from . import trsock
50from .log import logger
51
52
53__all__ = 'BaseEventLoop','Server',
54
55
56# Minimum number of _scheduled timer handles before cleanup of
57# cancelled handles is performed.
58_MIN_SCHEDULED_TIMER_HANDLES = 100
59
60# Minimum fraction of _scheduled timer handles that are cancelled
61# before cleanup of cancelled handles is performed.
62_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
63
64
65_HAS_IPv6 = hasattr(socket, 'AF_INET6')
66
67# Maximum timeout passed to select to avoid OS limitations
68MAXIMUM_SELECT_TIMEOUT = 24 * 3600
69
70
71def _format_handle(handle):
72    cb = handle._callback
73    if isinstance(getattr(cb, '__self__', None), tasks.Task):
74        # format the task
75        return repr(cb.__self__)
76    else:
77        return str(handle)
78
79
80def _format_pipe(fd):
81    if fd == subprocess.PIPE:
82        return '<pipe>'
83    elif fd == subprocess.STDOUT:
84        return '<stdout>'
85    else:
86        return repr(fd)
87
88
89def _set_reuseport(sock):
90    if not hasattr(socket, 'SO_REUSEPORT'):
91        raise ValueError('reuse_port not supported by socket module')
92    else:
93        try:
94            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
95        except OSError:
96            raise ValueError('reuse_port not supported by socket module, '
97                             'SO_REUSEPORT defined but not implemented.')
98
99
100def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
101    # Try to skip getaddrinfo if "host" is already an IP. Users might have
102    # handled name resolution in their own code and pass in resolved IPs.
103    if not hasattr(socket, 'inet_pton'):
104        return
105
106    if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
107            host is None:
108        return None
109
110    if type == socket.SOCK_STREAM:
111        proto = socket.IPPROTO_TCP
112    elif type == socket.SOCK_DGRAM:
113        proto = socket.IPPROTO_UDP
114    else:
115        return None
116
117    if port is None:
118        port = 0
119    elif isinstance(port, bytes) and port == b'':
120        port = 0
121    elif isinstance(port, str) and port == '':
122        port = 0
123    else:
124        # If port's a service name like "http", don't skip getaddrinfo.
125        try:
126            port = int(port)
127        except (TypeError, ValueError):
128            return None
129
130    if family == socket.AF_UNSPEC:
131        afs = [socket.AF_INET]
132        if _HAS_IPv6:
133            afs.append(socket.AF_INET6)
134    else:
135        afs = [family]
136
137    if isinstance(host, bytes):
138        host = host.decode('idna')
139    if '%' in host:
140        # Linux's inet_pton doesn't accept an IPv6 zone index after host,
141        # like '::1%lo0'.
142        return None
143
144    for af in afs:
145        try:
146            socket.inet_pton(af, host)
147            # The host has already been resolved.
148            if _HAS_IPv6 and af == socket.AF_INET6:
149                return af, type, proto, '', (host, port, flowinfo, scopeid)
150            else:
151                return af, type, proto, '', (host, port)
152        except OSError:
153            pass
154
155    # "host" is not an IP address.
156    return None
157
158
159def _interleave_addrinfos(addrinfos, first_address_family_count=1):
160    """Interleave list of addrinfo tuples by family."""
161    # Group addresses by family
162    addrinfos_by_family = collections.OrderedDict()
163    for addr in addrinfos:
164        family = addr[0]
165        if family not in addrinfos_by_family:
166            addrinfos_by_family[family] = []
167        addrinfos_by_family[family].append(addr)
168    addrinfos_lists = list(addrinfos_by_family.values())
169
170    reordered = []
171    if first_address_family_count > 1:
172        reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
173        del addrinfos_lists[0][:first_address_family_count - 1]
174    reordered.extend(
175        a for a in itertools.chain.from_iterable(
176            itertools.zip_longest(*addrinfos_lists)
177        ) if a is not None)
178    return reordered
179
180
181def _run_until_complete_cb(fut):
182    if not fut.cancelled():
183        exc = fut.exception()
184        if isinstance(exc, (SystemExit, KeyboardInterrupt)):
185            # Issue #22429: run_forever() already finished, no need to
186            # stop it.
187            return
188    futures._get_loop(fut).stop()
189
190
191if hasattr(socket, 'TCP_NODELAY'):
192    def _set_nodelay(sock):
193        if (sock.family in {socket.AF_INET, socket.AF_INET6} and
194                sock.type == socket.SOCK_STREAM and
195                sock.proto == socket.IPPROTO_TCP):
196            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
197else:
198    def _set_nodelay(sock):
199        pass
200
201
202def _check_ssl_socket(sock):
203    if ssl is not None and isinstance(sock, ssl.SSLSocket):
204        raise TypeError("Socket cannot be of type SSLSocket")
205
206
207class _SendfileFallbackProtocol(protocols.Protocol):
208    def __init__(self, transp):
209        if not isinstance(transp, transports._FlowControlMixin):
210            raise TypeError("transport should be _FlowControlMixin instance")
211        self._transport = transp
212        self._proto = transp.get_protocol()
213        self._should_resume_reading = transp.is_reading()
214        self._should_resume_writing = transp._protocol_paused
215        transp.pause_reading()
216        transp.set_protocol(self)
217        if self._should_resume_writing:
218            self._write_ready_fut = self._transport._loop.create_future()
219        else:
220            self._write_ready_fut = None
221
222    async def drain(self):
223        if self._transport.is_closing():
224            raise ConnectionError("Connection closed by peer")
225        fut = self._write_ready_fut
226        if fut is None:
227            return
228        await fut
229
230    def connection_made(self, transport):
231        raise RuntimeError("Invalid state: "
232                           "connection should have been established already.")
233
234    def connection_lost(self, exc):
235        if self._write_ready_fut is not None:
236            # Never happens if peer disconnects after sending the whole content
237            # Thus disconnection is always an exception from user perspective
238            if exc is None:
239                self._write_ready_fut.set_exception(
240                    ConnectionError("Connection is closed by peer"))
241            else:
242                self._write_ready_fut.set_exception(exc)
243        self._proto.connection_lost(exc)
244
245    def pause_writing(self):
246        if self._write_ready_fut is not None:
247            return
248        self._write_ready_fut = self._transport._loop.create_future()
249
250    def resume_writing(self):
251        if self._write_ready_fut is None:
252            return
253        self._write_ready_fut.set_result(False)
254        self._write_ready_fut = None
255
256    def data_received(self, data):
257        raise RuntimeError("Invalid state: reading should be paused")
258
259    def eof_received(self):
260        raise RuntimeError("Invalid state: reading should be paused")
261
262    async def restore(self):
263        self._transport.set_protocol(self._proto)
264        if self._should_resume_reading:
265            self._transport.resume_reading()
266        if self._write_ready_fut is not None:
267            # Cancel the future.
268            # Basically it has no effect because protocol is switched back,
269            # no code should wait for it anymore.
270            self._write_ready_fut.cancel()
271        if self._should_resume_writing:
272            self._proto.resume_writing()
273
274
275class Server(events.AbstractServer):
276
277    def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
278                 ssl_handshake_timeout, ssl_shutdown_timeout=None):
279        self._loop = loop
280        self._sockets = sockets
281        # Weak references so we don't break Transport's ability to
282        # detect abandoned transports
283        self._clients = weakref.WeakSet()
284        self._waiters = []
285        self._protocol_factory = protocol_factory
286        self._backlog = backlog
287        self._ssl_context = ssl_context
288        self._ssl_handshake_timeout = ssl_handshake_timeout
289        self._ssl_shutdown_timeout = ssl_shutdown_timeout
290        self._serving = False
291        self._serving_forever_fut = None
292
293    def __repr__(self):
294        return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
295
296    def _attach(self, transport):
297        assert self._sockets is not None
298        self._clients.add(transport)
299
300    def _detach(self, transport):
301        self._clients.discard(transport)
302        if len(self._clients) == 0 and self._sockets is None:
303            self._wakeup()
304
305    def _wakeup(self):
306        waiters = self._waiters
307        self._waiters = None
308        for waiter in waiters:
309            if not waiter.done():
310                waiter.set_result(None)
311
312    def _start_serving(self):
313        if self._serving:
314            return
315        self._serving = True
316        for sock in self._sockets:
317            sock.listen(self._backlog)
318            self._loop._start_serving(
319                self._protocol_factory, sock, self._ssl_context,
320                self, self._backlog, self._ssl_handshake_timeout,
321                self._ssl_shutdown_timeout)
322
323    def get_loop(self):
324        return self._loop
325
326    def is_serving(self):
327        return self._serving
328
329    @property
330    def sockets(self):
331        if self._sockets is None:
332            return ()
333        return tuple(trsock.TransportSocket(s) for s in self._sockets)
334
335    def close(self):
336        sockets = self._sockets
337        if sockets is None:
338            return
339        self._sockets = None
340
341        for sock in sockets:
342            self._loop._stop_serving(sock)
343
344        self._serving = False
345
346        if (self._serving_forever_fut is not None and
347                not self._serving_forever_fut.done()):
348            self._serving_forever_fut.cancel()
349            self._serving_forever_fut = None
350
351        if len(self._clients) == 0:
352            self._wakeup()
353
354    def close_clients(self):
355        for transport in self._clients.copy():
356            transport.close()
357
358    def abort_clients(self):
359        for transport in self._clients.copy():
360            transport.abort()
361
362    async def start_serving(self):
363        self._start_serving()
364        # Skip one loop iteration so that all 'loop.add_reader'
365        # go through.
366        await tasks.sleep(0)
367
368    async def serve_forever(self):
369        if self._serving_forever_fut is not None:
370            raise RuntimeError(
371                f'server {self!r} is already being awaited on serve_forever()')
372        if self._sockets is None:
373            raise RuntimeError(f'server {self!r} is closed')
374
375        self._start_serving()
376        self._serving_forever_fut = self._loop.create_future()
377
378        try:
379            await self._serving_forever_fut
380        except exceptions.CancelledError:
381            try:
382                self.close()
383                await self.wait_closed()
384            finally:
385                raise
386        finally:
387            self._serving_forever_fut = None
388
389    async def wait_closed(self):
390        """Wait until server is closed and all connections are dropped.
391
392        - If the server is not closed, wait.
393        - If it is closed, but there are still active connections, wait.
394
395        Anyone waiting here will be unblocked once both conditions
396        (server is closed and all connections have been dropped)
397        have become true, in either order.
398
399        Historical note: In 3.11 and before, this was broken, returning
400        immediately if the server was already closed, even if there
401        were still active connections. An attempted fix in 3.12.0 was
402        still broken, returning immediately if the server was still
403        open and there were no active connections. Hopefully in 3.12.1
404        we have it right.
405        """
406        # Waiters are unblocked by self._wakeup(), which is called
407        # from two places: self.close() and self._detach(), but only
408        # when both conditions have become true. To signal that this
409        # has happened, self._wakeup() sets self._waiters to None.
410        if self._waiters is None:
411            return
412        waiter = self._loop.create_future()
413        self._waiters.append(waiter)
414        await waiter
415
416
417class BaseEventLoop(events.AbstractEventLoop):
418
419    def __init__(self):
420        self._timer_cancelled_count = 0
421        self._closed = False
422        self._stopping = False
423        self._ready = collections.deque()
424        self._scheduled = []
425        self._default_executor = None
426        self._internal_fds = 0
427        # Identifier of the thread running the event loop, or None if the
428        # event loop is not running
429        self._thread_id = None
430        self._clock_resolution = time.get_clock_info('monotonic').resolution
431        self._exception_handler = None
432        self.set_debug(coroutines._is_debug_mode())
433        # The preserved state of async generator hooks.
434        self._old_agen_hooks = None
435        # In debug mode, if the execution of a callback or a step of a task
436        # exceed this duration in seconds, the slow callback/task is logged.
437        self.slow_callback_duration = 0.1
438        self._current_handle = None
439        self._task_factory = None
440        self._coroutine_origin_tracking_enabled = False
441        self._coroutine_origin_tracking_saved_depth = None
442
443        # A weak set of all asynchronous generators that are
444        # being iterated by the loop.
445        self._asyncgens = weakref.WeakSet()
446        # Set to True when `loop.shutdown_asyncgens` is called.
447        self._asyncgens_shutdown_called = False
448        # Set to True when `loop.shutdown_default_executor` is called.
449        self._executor_shutdown_called = False
450
451    def __repr__(self):
452        return (
453            f'<{self.__class__.__name__} running={self.is_running()} '
454            f'closed={self.is_closed()} debug={self.get_debug()}>'
455        )
456
457    def create_future(self):
458        """Create a Future object attached to the loop."""
459        return futures.Future(loop=self)
460
461    def create_task(self, coro, *, name=None, context=None):
462        """Schedule a coroutine object.
463
464        Return a task object.
465        """
466        self._check_closed()
467        if self._task_factory is None:
468            task = tasks.Task(coro, loop=self, name=name, context=context)
469            if task._source_traceback:
470                del task._source_traceback[-1]
471        else:
472            if context is None:
473                # Use legacy API if context is not needed
474                task = self._task_factory(self, coro)
475            else:
476                task = self._task_factory(self, coro, context=context)
477
478            task.set_name(name)
479
480        return task
481
482    def set_task_factory(self, factory):
483        """Set a task factory that will be used by loop.create_task().
484
485        If factory is None the default task factory will be set.
486
487        If factory is a callable, it should have a signature matching
488        '(loop, coro)', where 'loop' will be a reference to the active
489        event loop, 'coro' will be a coroutine object.  The callable
490        must return a Future.
491        """
492        if factory is not None and not callable(factory):
493            raise TypeError('task factory must be a callable or None')
494        self._task_factory = factory
495
496    def get_task_factory(self):
497        """Return a task factory, or None if the default one is in use."""
498        return self._task_factory
499
500    def _make_socket_transport(self, sock, protocol, waiter=None, *,
501                               extra=None, server=None):
502        """Create socket transport."""
503        raise NotImplementedError
504
505    def _make_ssl_transport(
506            self, rawsock, protocol, sslcontext, waiter=None,
507            *, server_side=False, server_hostname=None,
508            extra=None, server=None,
509            ssl_handshake_timeout=None,
510            ssl_shutdown_timeout=None,
511            call_connection_made=True):
512        """Create SSL transport."""
513        raise NotImplementedError
514
515    def _make_datagram_transport(self, sock, protocol,
516                                 address=None, waiter=None, extra=None):
517        """Create datagram transport."""
518        raise NotImplementedError
519
520    def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
521                                  extra=None):
522        """Create read pipe transport."""
523        raise NotImplementedError
524
525    def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
526                                   extra=None):
527        """Create write pipe transport."""
528        raise NotImplementedError
529
530    async def _make_subprocess_transport(self, protocol, args, shell,
531                                         stdin, stdout, stderr, bufsize,
532                                         extra=None, **kwargs):
533        """Create subprocess transport."""
534        raise NotImplementedError
535
536    def _write_to_self(self):
537        """Write a byte to self-pipe, to wake up the event loop.
538
539        This may be called from a different thread.
540
541        The subclass is responsible for implementing the self-pipe.
542        """
543        raise NotImplementedError
544
545    def _process_events(self, event_list):
546        """Process selector events."""
547        raise NotImplementedError
548
549    def _check_closed(self):
550        if self._closed:
551            raise RuntimeError('Event loop is closed')
552
553    def _check_default_executor(self):
554        if self._executor_shutdown_called:
555            raise RuntimeError('Executor shutdown has been called')
556
557    def _asyncgen_finalizer_hook(self, agen):
558        self._asyncgens.discard(agen)
559        if not self.is_closed():
560            self.call_soon_threadsafe(self.create_task, agen.aclose())
561
562    def _asyncgen_firstiter_hook(self, agen):
563        if self._asyncgens_shutdown_called:
564            warnings.warn(
565                f"asynchronous generator {agen!r} was scheduled after "
566                f"loop.shutdown_asyncgens() call",
567                ResourceWarning, source=self)
568
569        self._asyncgens.add(agen)
570
571    async def shutdown_asyncgens(self):
572        """Shutdown all active asynchronous generators."""
573        self._asyncgens_shutdown_called = True
574
575        if not len(self._asyncgens):
576            # If Python version is <3.6 or we don't have any asynchronous
577            # generators alive.
578            return
579
580        closing_agens = list(self._asyncgens)
581        self._asyncgens.clear()
582
583        results = await tasks.gather(
584            *[ag.aclose() for ag in closing_agens],
585            return_exceptions=True)
586
587        for result, agen in zip(results, closing_agens):
588            if isinstance(result, Exception):
589                self.call_exception_handler({
590                    'message': f'an error occurred during closing of '
591                               f'asynchronous generator {agen!r}',
592                    'exception': result,
593                    'asyncgen': agen
594                })
595
596    async def shutdown_default_executor(self, timeout=None):
597        """Schedule the shutdown of the default executor.
598
599        The timeout parameter specifies the amount of time the executor will
600        be given to finish joining. The default value is None, which means
601        that the executor will be given an unlimited amount of time.
602        """
603        self._executor_shutdown_called = True
604        if self._default_executor is None:
605            return
606        future = self.create_future()
607        thread = threading.Thread(target=self._do_shutdown, args=(future,))
608        thread.start()
609        try:
610            async with timeouts.timeout(timeout):
611                await future
612        except TimeoutError:
613            warnings.warn("The executor did not finishing joining "
614                          f"its threads within {timeout} seconds.",
615                          RuntimeWarning, stacklevel=2)
616            self._default_executor.shutdown(wait=False)
617        else:
618            thread.join()
619
620    def _do_shutdown(self, future):
621        try:
622            self._default_executor.shutdown(wait=True)
623            if not self.is_closed():
624                self.call_soon_threadsafe(futures._set_result_unless_cancelled,
625                                          future, None)
626        except Exception as ex:
627            if not self.is_closed() and not future.cancelled():
628                self.call_soon_threadsafe(future.set_exception, ex)
629
630    def _check_running(self):
631        if self.is_running():
632            raise RuntimeError('This event loop is already running')
633        if events._get_running_loop() is not None:
634            raise RuntimeError(
635                'Cannot run the event loop while another loop is running')
636
637    def _run_forever_setup(self):
638        """Prepare the run loop to process events.
639
640        This method exists so that custom custom event loop subclasses (e.g., event loops
641        that integrate a GUI event loop with Python's event loop) have access to all the
642        loop setup logic.
643        """
644        self._check_closed()
645        self._check_running()
646        self._set_coroutine_origin_tracking(self._debug)
647
648        self._old_agen_hooks = sys.get_asyncgen_hooks()
649        self._thread_id = threading.get_ident()
650        sys.set_asyncgen_hooks(
651            firstiter=self._asyncgen_firstiter_hook,
652            finalizer=self._asyncgen_finalizer_hook
653        )
654
655        events._set_running_loop(self)
656
657    def _run_forever_cleanup(self):
658        """Clean up after an event loop finishes the looping over events.
659
660        This method exists so that custom custom event loop subclasses (e.g., event loops
661        that integrate a GUI event loop with Python's event loop) have access to all the
662        loop cleanup logic.
663        """
664        self._stopping = False
665        self._thread_id = None
666        events._set_running_loop(None)
667        self._set_coroutine_origin_tracking(False)
668        # Restore any pre-existing async generator hooks.
669        if self._old_agen_hooks is not None:
670            sys.set_asyncgen_hooks(*self._old_agen_hooks)
671            self._old_agen_hooks = None
672
673    def run_forever(self):
674        """Run until stop() is called."""
675        try:
676            self._run_forever_setup()
677            while True:
678                self._run_once()
679                if self._stopping:
680                    break
681        finally:
682            self._run_forever_cleanup()
683
684    def run_until_complete(self, future):
685        """Run until the Future is done.
686
687        If the argument is a coroutine, it is wrapped in a Task.
688
689        WARNING: It would be disastrous to call run_until_complete()
690        with the same coroutine twice -- it would wrap it in two
691        different Tasks and that can't be good.
692
693        Return the Future's result, or raise its exception.
694        """
695        self._check_closed()
696        self._check_running()
697
698        new_task = not futures.isfuture(future)
699        future = tasks.ensure_future(future, loop=self)
700        if new_task:
701            # An exception is raised if the future didn't complete, so there
702            # is no need to log the "destroy pending task" message
703            future._log_destroy_pending = False
704
705        future.add_done_callback(_run_until_complete_cb)
706        try:
707            self.run_forever()
708        except:
709            if new_task and future.done() and not future.cancelled():
710                # The coroutine raised a BaseException. Consume the exception
711                # to not log a warning, the caller doesn't have access to the
712                # local task.
713                future.exception()
714            raise
715        finally:
716            future.remove_done_callback(_run_until_complete_cb)
717        if not future.done():
718            raise RuntimeError('Event loop stopped before Future completed.')
719
720        return future.result()
721
722    def stop(self):
723        """Stop running the event loop.
724
725        Every callback already scheduled will still run.  This simply informs
726        run_forever to stop looping after a complete iteration.
727        """
728        self._stopping = True
729
730    def close(self):
731        """Close the event loop.
732
733        This clears the queues and shuts down the executor,
734        but does not wait for the executor to finish.
735
736        The event loop must not be running.
737        """
738        if self.is_running():
739            raise RuntimeError("Cannot close a running event loop")
740        if self._closed:
741            return
742        if self._debug:
743            logger.debug("Close %r", self)
744        self._closed = True
745        self._ready.clear()
746        self._scheduled.clear()
747        self._executor_shutdown_called = True
748        executor = self._default_executor
749        if executor is not None:
750            self._default_executor = None
751            executor.shutdown(wait=False)
752
753    def is_closed(self):
754        """Returns True if the event loop was closed."""
755        return self._closed
756
757    def __del__(self, _warn=warnings.warn):
758        if not self.is_closed():
759            _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
760            if not self.is_running():
761                self.close()
762
763    def is_running(self):
764        """Returns True if the event loop is running."""
765        return (self._thread_id is not None)
766
767    def time(self):
768        """Return the time according to the event loop's clock.
769
770        This is a float expressed in seconds since an epoch, but the
771        epoch, precision, accuracy and drift are unspecified and may
772        differ per event loop.
773        """
774        return time.monotonic()
775
776    def call_later(self, delay, callback, *args, context=None):
777        """Arrange for a callback to be called at a given time.
778
779        Return a Handle: an opaque object with a cancel() method that
780        can be used to cancel the call.
781
782        The delay can be an int or float, expressed in seconds.  It is
783        always relative to the current time.
784
785        Each callback will be called exactly once.  If two callbacks
786        are scheduled for exactly the same time, it is undefined which
787        will be called first.
788
789        Any positional arguments after the callback will be passed to
790        the callback when it is called.
791        """
792        if delay is None:
793            raise TypeError('delay must not be None')
794        timer = self.call_at(self.time() + delay, callback, *args,
795                             context=context)
796        if timer._source_traceback:
797            del timer._source_traceback[-1]
798        return timer
799
800    def call_at(self, when, callback, *args, context=None):
801        """Like call_later(), but uses an absolute time.
802
803        Absolute time corresponds to the event loop's time() method.
804        """
805        if when is None:
806            raise TypeError("when cannot be None")
807        self._check_closed()
808        if self._debug:
809            self._check_thread()
810            self._check_callback(callback, 'call_at')
811        timer = events.TimerHandle(when, callback, args, self, context)
812        if timer._source_traceback:
813            del timer._source_traceback[-1]
814        heapq.heappush(self._scheduled, timer)
815        timer._scheduled = True
816        return timer
817
818    def call_soon(self, callback, *args, context=None):
819        """Arrange for a callback to be called as soon as possible.
820
821        This operates as a FIFO queue: callbacks are called in the
822        order in which they are registered.  Each callback will be
823        called exactly once.
824
825        Any positional arguments after the callback will be passed to
826        the callback when it is called.
827        """
828        self._check_closed()
829        if self._debug:
830            self._check_thread()
831            self._check_callback(callback, 'call_soon')
832        handle = self._call_soon(callback, args, context)
833        if handle._source_traceback:
834            del handle._source_traceback[-1]
835        return handle
836
837    def _check_callback(self, callback, method):
838        if (coroutines.iscoroutine(callback) or
839                coroutines.iscoroutinefunction(callback)):
840            raise TypeError(
841                f"coroutines cannot be used with {method}()")
842        if not callable(callback):
843            raise TypeError(
844                f'a callable object was expected by {method}(), '
845                f'got {callback!r}')
846
847    def _call_soon(self, callback, args, context):
848        handle = events.Handle(callback, args, self, context)
849        if handle._source_traceback:
850            del handle._source_traceback[-1]
851        self._ready.append(handle)
852        return handle
853
854    def _check_thread(self):
855        """Check that the current thread is the thread running the event loop.
856
857        Non-thread-safe methods of this class make this assumption and will
858        likely behave incorrectly when the assumption is violated.
859
860        Should only be called when (self._debug == True).  The caller is
861        responsible for checking this condition for performance reasons.
862        """
863        if self._thread_id is None:
864            return
865        thread_id = threading.get_ident()
866        if thread_id != self._thread_id:
867            raise RuntimeError(
868                "Non-thread-safe operation invoked on an event loop other "
869                "than the current one")
870
871    def call_soon_threadsafe(self, callback, *args, context=None):
872        """Like call_soon(), but thread-safe."""
873        self._check_closed()
874        if self._debug:
875            self._check_callback(callback, 'call_soon_threadsafe')
876        handle = self._call_soon(callback, args, context)
877        if handle._source_traceback:
878            del handle._source_traceback[-1]
879        self._write_to_self()
880        return handle
881
882    def run_in_executor(self, executor, func, *args):
883        self._check_closed()
884        if self._debug:
885            self._check_callback(func, 'run_in_executor')
886        if executor is None:
887            executor = self._default_executor
888            # Only check when the default executor is being used
889            self._check_default_executor()
890            if executor is None:
891                executor = concurrent.futures.ThreadPoolExecutor(
892                    thread_name_prefix='asyncio'
893                )
894                self._default_executor = executor
895        return futures.wrap_future(
896            executor.submit(func, *args), loop=self)
897
898    def set_default_executor(self, executor):
899        if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
900            raise TypeError('executor must be ThreadPoolExecutor instance')
901        self._default_executor = executor
902
903    def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
904        msg = [f"{host}:{port!r}"]
905        if family:
906            msg.append(f'family={family!r}')
907        if type:
908            msg.append(f'type={type!r}')
909        if proto:
910            msg.append(f'proto={proto!r}')
911        if flags:
912            msg.append(f'flags={flags!r}')
913        msg = ', '.join(msg)
914        logger.debug('Get address info %s', msg)
915
916        t0 = self.time()
917        addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
918        dt = self.time() - t0
919
920        msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
921        if dt >= self.slow_callback_duration:
922            logger.info(msg)
923        else:
924            logger.debug(msg)
925        return addrinfo
926
927    async def getaddrinfo(self, host, port, *,
928                          family=0, type=0, proto=0, flags=0):
929        if self._debug:
930            getaddr_func = self._getaddrinfo_debug
931        else:
932            getaddr_func = socket.getaddrinfo
933
934        return await self.run_in_executor(
935            None, getaddr_func, host, port, family, type, proto, flags)
936
937    async def getnameinfo(self, sockaddr, flags=0):
938        return await self.run_in_executor(
939            None, socket.getnameinfo, sockaddr, flags)
940
941    async def sock_sendfile(self, sock, file, offset=0, count=None,
942                            *, fallback=True):
943        if self._debug and sock.gettimeout() != 0:
944            raise ValueError("the socket must be non-blocking")
945        _check_ssl_socket(sock)
946        self._check_sendfile_params(sock, file, offset, count)
947        try:
948            return await self._sock_sendfile_native(sock, file,
949                                                    offset, count)
950        except exceptions.SendfileNotAvailableError as exc:
951            if not fallback:
952                raise
953        return await self._sock_sendfile_fallback(sock, file,
954                                                  offset, count)
955
956    async def _sock_sendfile_native(self, sock, file, offset, count):
957        # NB: sendfile syscall is not supported for SSL sockets and
958        # non-mmap files even if sendfile is supported by OS
959        raise exceptions.SendfileNotAvailableError(
960            f"syscall sendfile is not available for socket {sock!r} "
961            f"and file {file!r} combination")
962
963    async def _sock_sendfile_fallback(self, sock, file, offset, count):
964        if offset:
965            file.seek(offset)
966        blocksize = (
967            min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
968            if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
969        )
970        buf = bytearray(blocksize)
971        total_sent = 0
972        try:
973            while True:
974                if count:
975                    blocksize = min(count - total_sent, blocksize)
976                    if blocksize <= 0:
977                        break
978                view = memoryview(buf)[:blocksize]
979                read = await self.run_in_executor(None, file.readinto, view)
980                if not read:
981                    break  # EOF
982                await self.sock_sendall(sock, view[:read])
983                total_sent += read
984            return total_sent
985        finally:
986            if total_sent > 0 and hasattr(file, 'seek'):
987                file.seek(offset + total_sent)
988
989    def _check_sendfile_params(self, sock, file, offset, count):
990        if 'b' not in getattr(file, 'mode', 'b'):
991            raise ValueError("file should be opened in binary mode")
992        if not sock.type == socket.SOCK_STREAM:
993            raise ValueError("only SOCK_STREAM type sockets are supported")
994        if count is not None:
995            if not isinstance(count, int):
996                raise TypeError(
997                    "count must be a positive integer (got {!r})".format(count))
998            if count <= 0:
999                raise ValueError(
1000                    "count must be a positive integer (got {!r})".format(count))
1001        if not isinstance(offset, int):
1002            raise TypeError(
1003                "offset must be a non-negative integer (got {!r})".format(
1004                    offset))
1005        if offset < 0:
1006            raise ValueError(
1007                "offset must be a non-negative integer (got {!r})".format(
1008                    offset))
1009
1010    async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
1011        """Create, bind and connect one socket."""
1012        my_exceptions = []
1013        exceptions.append(my_exceptions)
1014        family, type_, proto, _, address = addr_info
1015        sock = None
1016        try:
1017            sock = socket.socket(family=family, type=type_, proto=proto)
1018            sock.setblocking(False)
1019            if local_addr_infos is not None:
1020                for lfamily, _, _, _, laddr in local_addr_infos:
1021                    # skip local addresses of different family
1022                    if lfamily != family:
1023                        continue
1024                    try:
1025                        sock.bind(laddr)
1026                        break
1027                    except OSError as exc:
1028                        msg = (
1029                            f'error while attempting to bind on '
1030                            f'address {laddr!r}: {str(exc).lower()}'
1031                        )
1032                        exc = OSError(exc.errno, msg)
1033                        my_exceptions.append(exc)
1034                else:  # all bind attempts failed
1035                    if my_exceptions:
1036                        raise my_exceptions.pop()
1037                    else:
1038                        raise OSError(f"no matching local address with {family=} found")
1039            await self.sock_connect(sock, address)
1040            return sock
1041        except OSError as exc:
1042            my_exceptions.append(exc)
1043            if sock is not None:
1044                sock.close()
1045            raise
1046        except:
1047            if sock is not None:
1048                sock.close()
1049            raise
1050        finally:
1051            exceptions = my_exceptions = None
1052
1053    async def create_connection(
1054            self, protocol_factory, host=None, port=None,
1055            *, ssl=None, family=0,
1056            proto=0, flags=0, sock=None,
1057            local_addr=None, server_hostname=None,
1058            ssl_handshake_timeout=None,
1059            ssl_shutdown_timeout=None,
1060            happy_eyeballs_delay=None, interleave=None,
1061            all_errors=False):
1062        """Connect to a TCP server.
1063
1064        Create a streaming transport connection to a given internet host and
1065        port: socket family AF_INET or socket.AF_INET6 depending on host (or
1066        family if specified), socket type SOCK_STREAM. protocol_factory must be
1067        a callable returning a protocol instance.
1068
1069        This method is a coroutine which will try to establish the connection
1070        in the background.  When successful, the coroutine returns a
1071        (transport, protocol) pair.
1072        """
1073        if server_hostname is not None and not ssl:
1074            raise ValueError('server_hostname is only meaningful with ssl')
1075
1076        if server_hostname is None and ssl:
1077            # Use host as default for server_hostname.  It is an error
1078            # if host is empty or not set, e.g. when an
1079            # already-connected socket was passed or when only a port
1080            # is given.  To avoid this error, you can pass
1081            # server_hostname='' -- this will bypass the hostname
1082            # check.  (This also means that if host is a numeric
1083            # IP/IPv6 address, we will attempt to verify that exact
1084            # address; this will probably fail, but it is possible to
1085            # create a certificate for a specific IP address, so we
1086            # don't judge it here.)
1087            if not host:
1088                raise ValueError('You must set server_hostname '
1089                                 'when using ssl without a host')
1090            server_hostname = host
1091
1092        if ssl_handshake_timeout is not None and not ssl:
1093            raise ValueError(
1094                'ssl_handshake_timeout is only meaningful with ssl')
1095
1096        if ssl_shutdown_timeout is not None and not ssl:
1097            raise ValueError(
1098                'ssl_shutdown_timeout is only meaningful with ssl')
1099
1100        if sock is not None:
1101            _check_ssl_socket(sock)
1102
1103        if happy_eyeballs_delay is not None and interleave is None:
1104            # If using happy eyeballs, default to interleave addresses by family
1105            interleave = 1
1106
1107        if host is not None or port is not None:
1108            if sock is not None:
1109                raise ValueError(
1110                    'host/port and sock can not be specified at the same time')
1111
1112            infos = await self._ensure_resolved(
1113                (host, port), family=family,
1114                type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
1115            if not infos:
1116                raise OSError('getaddrinfo() returned empty list')
1117
1118            if local_addr is not None:
1119                laddr_infos = await self._ensure_resolved(
1120                    local_addr, family=family,
1121                    type=socket.SOCK_STREAM, proto=proto,
1122                    flags=flags, loop=self)
1123                if not laddr_infos:
1124                    raise OSError('getaddrinfo() returned empty list')
1125            else:
1126                laddr_infos = None
1127
1128            if interleave:
1129                infos = _interleave_addrinfos(infos, interleave)
1130
1131            exceptions = []
1132            if happy_eyeballs_delay is None:
1133                # not using happy eyeballs
1134                for addrinfo in infos:
1135                    try:
1136                        sock = await self._connect_sock(
1137                            exceptions, addrinfo, laddr_infos)
1138                        break
1139                    except OSError:
1140                        continue
1141            else:  # using happy eyeballs
1142                sock = (await staggered.staggered_race(
1143                    (
1144                        # can't use functools.partial as it keeps a reference
1145                        # to exceptions
1146                        lambda addrinfo=addrinfo: self._connect_sock(
1147                            exceptions, addrinfo, laddr_infos
1148                        )
1149                        for addrinfo in infos
1150                    ),
1151                    happy_eyeballs_delay,
1152                    loop=self,
1153                ))[0]  # can't use sock, _, _ as it keeks a reference to exceptions
1154
1155            if sock is None:
1156                exceptions = [exc for sub in exceptions for exc in sub]
1157                try:
1158                    if all_errors:
1159                        raise ExceptionGroup("create_connection failed", exceptions)
1160                    if len(exceptions) == 1:
1161                        raise exceptions[0]
1162                    else:
1163                        # If they all have the same str(), raise one.
1164                        model = str(exceptions[0])
1165                        if all(str(exc) == model for exc in exceptions):
1166                            raise exceptions[0]
1167                        # Raise a combined exception so the user can see all
1168                        # the various error messages.
1169                        raise OSError('Multiple exceptions: {}'.format(
1170                            ', '.join(str(exc) for exc in exceptions)))
1171                finally:
1172                    exceptions = None
1173
1174        else:
1175            if sock is None:
1176                raise ValueError(
1177                    'host and port was not specified and no sock specified')
1178            if sock.type != socket.SOCK_STREAM:
1179                # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1180                # are SOCK_STREAM.
1181                # We support passing AF_UNIX sockets even though we have
1182                # a dedicated API for that: create_unix_connection.
1183                # Disallowing AF_UNIX in this method, breaks backwards
1184                # compatibility.
1185                raise ValueError(
1186                    f'A Stream Socket was expected, got {sock!r}')
1187
1188        transport, protocol = await self._create_connection_transport(
1189            sock, protocol_factory, ssl, server_hostname,
1190            ssl_handshake_timeout=ssl_handshake_timeout,
1191            ssl_shutdown_timeout=ssl_shutdown_timeout)
1192        if self._debug:
1193            # Get the socket from the transport because SSL transport closes
1194            # the old socket and creates a new SSL socket
1195            sock = transport.get_extra_info('socket')
1196            logger.debug("%r connected to %s:%r: (%r, %r)",
1197                         sock, host, port, transport, protocol)
1198        return transport, protocol
1199
1200    async def _create_connection_transport(
1201            self, sock, protocol_factory, ssl,
1202            server_hostname, server_side=False,
1203            ssl_handshake_timeout=None,
1204            ssl_shutdown_timeout=None):
1205
1206        sock.setblocking(False)
1207
1208        protocol = protocol_factory()
1209        waiter = self.create_future()
1210        if ssl:
1211            sslcontext = None if isinstance(ssl, bool) else ssl
1212            transport = self._make_ssl_transport(
1213                sock, protocol, sslcontext, waiter,
1214                server_side=server_side, server_hostname=server_hostname,
1215                ssl_handshake_timeout=ssl_handshake_timeout,
1216                ssl_shutdown_timeout=ssl_shutdown_timeout)
1217        else:
1218            transport = self._make_socket_transport(sock, protocol, waiter)
1219
1220        try:
1221            await waiter
1222        except:
1223            transport.close()
1224            raise
1225
1226        return transport, protocol
1227
1228    async def sendfile(self, transport, file, offset=0, count=None,
1229                       *, fallback=True):
1230        """Send a file to transport.
1231
1232        Return the total number of bytes which were sent.
1233
1234        The method uses high-performance os.sendfile if available.
1235
1236        file must be a regular file object opened in binary mode.
1237
1238        offset tells from where to start reading the file. If specified,
1239        count is the total number of bytes to transmit as opposed to
1240        sending the file until EOF is reached. File position is updated on
1241        return or also in case of error in which case file.tell()
1242        can be used to figure out the number of bytes
1243        which were sent.
1244
1245        fallback set to True makes asyncio to manually read and send
1246        the file when the platform does not support the sendfile syscall
1247        (e.g. Windows or SSL socket on Unix).
1248
1249        Raise SendfileNotAvailableError if the system does not support
1250        sendfile syscall and fallback is False.
1251        """
1252        if transport.is_closing():
1253            raise RuntimeError("Transport is closing")
1254        mode = getattr(transport, '_sendfile_compatible',
1255                       constants._SendfileMode.UNSUPPORTED)
1256        if mode is constants._SendfileMode.UNSUPPORTED:
1257            raise RuntimeError(
1258                f"sendfile is not supported for transport {transport!r}")
1259        if mode is constants._SendfileMode.TRY_NATIVE:
1260            try:
1261                return await self._sendfile_native(transport, file,
1262                                                   offset, count)
1263            except exceptions.SendfileNotAvailableError as exc:
1264                if not fallback:
1265                    raise
1266
1267        if not fallback:
1268            raise RuntimeError(
1269                f"fallback is disabled and native sendfile is not "
1270                f"supported for transport {transport!r}")
1271
1272        return await self._sendfile_fallback(transport, file,
1273                                             offset, count)
1274
1275    async def _sendfile_native(self, transp, file, offset, count):
1276        raise exceptions.SendfileNotAvailableError(
1277            "sendfile syscall is not supported")
1278
1279    async def _sendfile_fallback(self, transp, file, offset, count):
1280        if offset:
1281            file.seek(offset)
1282        blocksize = min(count, 16384) if count else 16384
1283        buf = bytearray(blocksize)
1284        total_sent = 0
1285        proto = _SendfileFallbackProtocol(transp)
1286        try:
1287            while True:
1288                if count:
1289                    blocksize = min(count - total_sent, blocksize)
1290                    if blocksize <= 0:
1291                        return total_sent
1292                view = memoryview(buf)[:blocksize]
1293                read = await self.run_in_executor(None, file.readinto, view)
1294                if not read:
1295                    return total_sent  # EOF
1296                await proto.drain()
1297                transp.write(view[:read])
1298                total_sent += read
1299        finally:
1300            if total_sent > 0 and hasattr(file, 'seek'):
1301                file.seek(offset + total_sent)
1302            await proto.restore()
1303
1304    async def start_tls(self, transport, protocol, sslcontext, *,
1305                        server_side=False,
1306                        server_hostname=None,
1307                        ssl_handshake_timeout=None,
1308                        ssl_shutdown_timeout=None):
1309        """Upgrade transport to TLS.
1310
1311        Return a new transport that *protocol* should start using
1312        immediately.
1313        """
1314        if ssl is None:
1315            raise RuntimeError('Python ssl module is not available')
1316
1317        if not isinstance(sslcontext, ssl.SSLContext):
1318            raise TypeError(
1319                f'sslcontext is expected to be an instance of ssl.SSLContext, '
1320                f'got {sslcontext!r}')
1321
1322        if not getattr(transport, '_start_tls_compatible', False):
1323            raise TypeError(
1324                f'transport {transport!r} is not supported by start_tls()')
1325
1326        waiter = self.create_future()
1327        ssl_protocol = sslproto.SSLProtocol(
1328            self, protocol, sslcontext, waiter,
1329            server_side, server_hostname,
1330            ssl_handshake_timeout=ssl_handshake_timeout,
1331            ssl_shutdown_timeout=ssl_shutdown_timeout,
1332            call_connection_made=False)
1333
1334        # Pause early so that "ssl_protocol.data_received()" doesn't
1335        # have a chance to get called before "ssl_protocol.connection_made()".
1336        transport.pause_reading()
1337
1338        transport.set_protocol(ssl_protocol)
1339        conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1340        resume_cb = self.call_soon(transport.resume_reading)
1341
1342        try:
1343            await waiter
1344        except BaseException:
1345            transport.close()
1346            conmade_cb.cancel()
1347            resume_cb.cancel()
1348            raise
1349
1350        return ssl_protocol._app_transport
1351
1352    async def create_datagram_endpoint(self, protocol_factory,
1353                                       local_addr=None, remote_addr=None, *,
1354                                       family=0, proto=0, flags=0,
1355                                       reuse_port=None,
1356                                       allow_broadcast=None, sock=None):
1357        """Create datagram connection."""
1358        if sock is not None:
1359            if sock.type == socket.SOCK_STREAM:
1360                raise ValueError(
1361                    f'A datagram socket was expected, got {sock!r}')
1362            if (local_addr or remote_addr or
1363                    family or proto or flags or
1364                    reuse_port or allow_broadcast):
1365                # show the problematic kwargs in exception msg
1366                opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1367                            family=family, proto=proto, flags=flags,
1368                            reuse_port=reuse_port,
1369                            allow_broadcast=allow_broadcast)
1370                problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
1371                raise ValueError(
1372                    f'socket modifier keyword arguments can not be used '
1373                    f'when sock is specified. ({problems})')
1374            sock.setblocking(False)
1375            r_addr = None
1376        else:
1377            if not (local_addr or remote_addr):
1378                if family == 0:
1379                    raise ValueError('unexpected address family')
1380                addr_pairs_info = (((family, proto), (None, None)),)
1381            elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1382                for addr in (local_addr, remote_addr):
1383                    if addr is not None and not isinstance(addr, str):
1384                        raise TypeError('string is expected')
1385
1386                if local_addr and local_addr[0] not in (0, '\x00'):
1387                    try:
1388                        if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1389                            os.remove(local_addr)
1390                    except FileNotFoundError:
1391                        pass
1392                    except OSError as err:
1393                        # Directory may have permissions only to create socket.
1394                        logger.error('Unable to check or remove stale UNIX '
1395                                     'socket %r: %r',
1396                                     local_addr, err)
1397
1398                addr_pairs_info = (((family, proto),
1399                                    (local_addr, remote_addr)), )
1400            else:
1401                # join address by (family, protocol)
1402                addr_infos = {}  # Using order preserving dict
1403                for idx, addr in ((0, local_addr), (1, remote_addr)):
1404                    if addr is not None:
1405                        if not (isinstance(addr, tuple) and len(addr) == 2):
1406                            raise TypeError('2-tuple is expected')
1407
1408                        infos = await self._ensure_resolved(
1409                            addr, family=family, type=socket.SOCK_DGRAM,
1410                            proto=proto, flags=flags, loop=self)
1411                        if not infos:
1412                            raise OSError('getaddrinfo() returned empty list')
1413
1414                        for fam, _, pro, _, address in infos:
1415                            key = (fam, pro)
1416                            if key not in addr_infos:
1417                                addr_infos[key] = [None, None]
1418                            addr_infos[key][idx] = address
1419
1420                # each addr has to have info for each (family, proto) pair
1421                addr_pairs_info = [
1422                    (key, addr_pair) for key, addr_pair in addr_infos.items()
1423                    if not ((local_addr and addr_pair[0] is None) or
1424                            (remote_addr and addr_pair[1] is None))]
1425
1426                if not addr_pairs_info:
1427                    raise ValueError('can not get address information')
1428
1429            exceptions = []
1430
1431            for ((family, proto),
1432                 (local_address, remote_address)) in addr_pairs_info:
1433                sock = None
1434                r_addr = None
1435                try:
1436                    sock = socket.socket(
1437                        family=family, type=socket.SOCK_DGRAM, proto=proto)
1438                    if reuse_port:
1439                        _set_reuseport(sock)
1440                    if allow_broadcast:
1441                        sock.setsockopt(
1442                            socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1443                    sock.setblocking(False)
1444
1445                    if local_addr:
1446                        sock.bind(local_address)
1447                    if remote_addr:
1448                        if not allow_broadcast:
1449                            await self.sock_connect(sock, remote_address)
1450                        r_addr = remote_address
1451                except OSError as exc:
1452                    if sock is not None:
1453                        sock.close()
1454                    exceptions.append(exc)
1455                except:
1456                    if sock is not None:
1457                        sock.close()
1458                    raise
1459                else:
1460                    break
1461            else:
1462                raise exceptions[0]
1463
1464        protocol = protocol_factory()
1465        waiter = self.create_future()
1466        transport = self._make_datagram_transport(
1467            sock, protocol, r_addr, waiter)
1468        if self._debug:
1469            if local_addr:
1470                logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1471                            "created: (%r, %r)",
1472                            local_addr, remote_addr, transport, protocol)
1473            else:
1474                logger.debug("Datagram endpoint remote_addr=%r created: "
1475                             "(%r, %r)",
1476                             remote_addr, transport, protocol)
1477
1478        try:
1479            await waiter
1480        except:
1481            transport.close()
1482            raise
1483
1484        return transport, protocol
1485
1486    async def _ensure_resolved(self, address, *,
1487                               family=0, type=socket.SOCK_STREAM,
1488                               proto=0, flags=0, loop):
1489        host, port = address[:2]
1490        info = _ipaddr_info(host, port, family, type, proto, *address[2:])
1491        if info is not None:
1492            # "host" is already a resolved IP.
1493            return [info]
1494        else:
1495            return await loop.getaddrinfo(host, port, family=family, type=type,
1496                                          proto=proto, flags=flags)
1497
1498    async def _create_server_getaddrinfo(self, host, port, family, flags):
1499        infos = await self._ensure_resolved((host, port), family=family,
1500                                            type=socket.SOCK_STREAM,
1501                                            flags=flags, loop=self)
1502        if not infos:
1503            raise OSError(f'getaddrinfo({host!r}) returned empty list')
1504        return infos
1505
1506    async def create_server(
1507            self, protocol_factory, host=None, port=None,
1508            *,
1509            family=socket.AF_UNSPEC,
1510            flags=socket.AI_PASSIVE,
1511            sock=None,
1512            backlog=100,
1513            ssl=None,
1514            reuse_address=None,
1515            reuse_port=None,
1516            keep_alive=None,
1517            ssl_handshake_timeout=None,
1518            ssl_shutdown_timeout=None,
1519            start_serving=True):
1520        """Create a TCP server.
1521
1522        The host parameter can be a string, in that case the TCP server is
1523        bound to host and port.
1524
1525        The host parameter can also be a sequence of strings and in that case
1526        the TCP server is bound to all hosts of the sequence. If a host
1527        appears multiple times (possibly indirectly e.g. when hostnames
1528        resolve to the same IP address), the server is only bound once to that
1529        host.
1530
1531        Return a Server object which can be used to stop the service.
1532
1533        This method is a coroutine.
1534        """
1535        if isinstance(ssl, bool):
1536            raise TypeError('ssl argument must be an SSLContext or None')
1537
1538        if ssl_handshake_timeout is not None and ssl is None:
1539            raise ValueError(
1540                'ssl_handshake_timeout is only meaningful with ssl')
1541
1542        if ssl_shutdown_timeout is not None and ssl is None:
1543            raise ValueError(
1544                'ssl_shutdown_timeout is only meaningful with ssl')
1545
1546        if sock is not None:
1547            _check_ssl_socket(sock)
1548
1549        if host is not None or port is not None:
1550            if sock is not None:
1551                raise ValueError(
1552                    'host/port and sock can not be specified at the same time')
1553
1554            if reuse_address is None:
1555                reuse_address = os.name == "posix" and sys.platform != "cygwin"
1556            sockets = []
1557            if host == '':
1558                hosts = [None]
1559            elif (isinstance(host, str) or
1560                  not isinstance(host, collections.abc.Iterable)):
1561                hosts = [host]
1562            else:
1563                hosts = host
1564
1565            fs = [self._create_server_getaddrinfo(host, port, family=family,
1566                                                  flags=flags)
1567                  for host in hosts]
1568            infos = await tasks.gather(*fs)
1569            infos = set(itertools.chain.from_iterable(infos))
1570
1571            completed = False
1572            try:
1573                for res in infos:
1574                    af, socktype, proto, canonname, sa = res
1575                    try:
1576                        sock = socket.socket(af, socktype, proto)
1577                    except socket.error:
1578                        # Assume it's a bad family/type/protocol combination.
1579                        if self._debug:
1580                            logger.warning('create_server() failed to create '
1581                                           'socket.socket(%r, %r, %r)',
1582                                           af, socktype, proto, exc_info=True)
1583                        continue
1584                    sockets.append(sock)
1585                    if reuse_address:
1586                        sock.setsockopt(
1587                            socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1588                    if reuse_port:
1589                        _set_reuseport(sock)
1590                    if keep_alive:
1591                        sock.setsockopt(
1592                            socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
1593                    # Disable IPv4/IPv6 dual stack support (enabled by
1594                    # default on Linux) which makes a single socket
1595                    # listen on both address families.
1596                    if (_HAS_IPv6 and
1597                            af == socket.AF_INET6 and
1598                            hasattr(socket, 'IPPROTO_IPV6')):
1599                        sock.setsockopt(socket.IPPROTO_IPV6,
1600                                        socket.IPV6_V6ONLY,
1601                                        True)
1602                    try:
1603                        sock.bind(sa)
1604                    except OSError as err:
1605                        msg = ('error while attempting '
1606                               'to bind on address %r: %s'
1607                               % (sa, str(err).lower()))
1608                        if err.errno == errno.EADDRNOTAVAIL:
1609                            # Assume the family is not enabled (bpo-30945)
1610                            sockets.pop()
1611                            sock.close()
1612                            if self._debug:
1613                                logger.warning(msg)
1614                            continue
1615                        raise OSError(err.errno, msg) from None
1616
1617                if not sockets:
1618                    raise OSError('could not bind on any address out of %r'
1619                                  % ([info[4] for info in infos],))
1620
1621                completed = True
1622            finally:
1623                if not completed:
1624                    for sock in sockets:
1625                        sock.close()
1626        else:
1627            if sock is None:
1628                raise ValueError('Neither host/port nor sock were specified')
1629            if sock.type != socket.SOCK_STREAM:
1630                raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1631            sockets = [sock]
1632
1633        for sock in sockets:
1634            sock.setblocking(False)
1635
1636        server = Server(self, sockets, protocol_factory,
1637                        ssl, backlog, ssl_handshake_timeout,
1638                        ssl_shutdown_timeout)
1639        if start_serving:
1640            server._start_serving()
1641            # Skip one loop iteration so that all 'loop.add_reader'
1642            # go through.
1643            await tasks.sleep(0)
1644
1645        if self._debug:
1646            logger.info("%r is serving", server)
1647        return server
1648
1649    async def connect_accepted_socket(
1650            self, protocol_factory, sock,
1651            *, ssl=None,
1652            ssl_handshake_timeout=None,
1653            ssl_shutdown_timeout=None):
1654        if sock.type != socket.SOCK_STREAM:
1655            raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1656
1657        if ssl_handshake_timeout is not None and not ssl:
1658            raise ValueError(
1659                'ssl_handshake_timeout is only meaningful with ssl')
1660
1661        if ssl_shutdown_timeout is not None and not ssl:
1662            raise ValueError(
1663                'ssl_shutdown_timeout is only meaningful with ssl')
1664
1665        if sock is not None:
1666            _check_ssl_socket(sock)
1667
1668        transport, protocol = await self._create_connection_transport(
1669            sock, protocol_factory, ssl, '', server_side=True,
1670            ssl_handshake_timeout=ssl_handshake_timeout,
1671            ssl_shutdown_timeout=ssl_shutdown_timeout)
1672        if self._debug:
1673            # Get the socket from the transport because SSL transport closes
1674            # the old socket and creates a new SSL socket
1675            sock = transport.get_extra_info('socket')
1676            logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1677        return transport, protocol
1678
1679    async def connect_read_pipe(self, protocol_factory, pipe):
1680        protocol = protocol_factory()
1681        waiter = self.create_future()
1682        transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1683
1684        try:
1685            await waiter
1686        except:
1687            transport.close()
1688            raise
1689
1690        if self._debug:
1691            logger.debug('Read pipe %r connected: (%r, %r)',
1692                         pipe.fileno(), transport, protocol)
1693        return transport, protocol
1694
1695    async def connect_write_pipe(self, protocol_factory, pipe):
1696        protocol = protocol_factory()
1697        waiter = self.create_future()
1698        transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1699
1700        try:
1701            await waiter
1702        except:
1703            transport.close()
1704            raise
1705
1706        if self._debug:
1707            logger.debug('Write pipe %r connected: (%r, %r)',
1708                         pipe.fileno(), transport, protocol)
1709        return transport, protocol
1710
1711    def _log_subprocess(self, msg, stdin, stdout, stderr):
1712        info = [msg]
1713        if stdin is not None:
1714            info.append(f'stdin={_format_pipe(stdin)}')
1715        if stdout is not None and stderr == subprocess.STDOUT:
1716            info.append(f'stdout=stderr={_format_pipe(stdout)}')
1717        else:
1718            if stdout is not None:
1719                info.append(f'stdout={_format_pipe(stdout)}')
1720            if stderr is not None:
1721                info.append(f'stderr={_format_pipe(stderr)}')
1722        logger.debug(' '.join(info))
1723
1724    async def subprocess_shell(self, protocol_factory, cmd, *,
1725                               stdin=subprocess.PIPE,
1726                               stdout=subprocess.PIPE,
1727                               stderr=subprocess.PIPE,
1728                               universal_newlines=False,
1729                               shell=True, bufsize=0,
1730                               encoding=None, errors=None, text=None,
1731                               **kwargs):
1732        if not isinstance(cmd, (bytes, str)):
1733            raise ValueError("cmd must be a string")
1734        if universal_newlines:
1735            raise ValueError("universal_newlines must be False")
1736        if not shell:
1737            raise ValueError("shell must be True")
1738        if bufsize != 0:
1739            raise ValueError("bufsize must be 0")
1740        if text:
1741            raise ValueError("text must be False")
1742        if encoding is not None:
1743            raise ValueError("encoding must be None")
1744        if errors is not None:
1745            raise ValueError("errors must be None")
1746
1747        protocol = protocol_factory()
1748        debug_log = None
1749        if self._debug:
1750            # don't log parameters: they may contain sensitive information
1751            # (password) and may be too long
1752            debug_log = 'run shell command %r' % cmd
1753            self._log_subprocess(debug_log, stdin, stdout, stderr)
1754        transport = await self._make_subprocess_transport(
1755            protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1756        if self._debug and debug_log is not None:
1757            logger.info('%s: %r', debug_log, transport)
1758        return transport, protocol
1759
1760    async def subprocess_exec(self, protocol_factory, program, *args,
1761                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1762                              stderr=subprocess.PIPE, universal_newlines=False,
1763                              shell=False, bufsize=0,
1764                              encoding=None, errors=None, text=None,
1765                              **kwargs):
1766        if universal_newlines:
1767            raise ValueError("universal_newlines must be False")
1768        if shell:
1769            raise ValueError("shell must be False")
1770        if bufsize != 0:
1771            raise ValueError("bufsize must be 0")
1772        if text:
1773            raise ValueError("text must be False")
1774        if encoding is not None:
1775            raise ValueError("encoding must be None")
1776        if errors is not None:
1777            raise ValueError("errors must be None")
1778
1779        popen_args = (program,) + args
1780        protocol = protocol_factory()
1781        debug_log = None
1782        if self._debug:
1783            # don't log parameters: they may contain sensitive information
1784            # (password) and may be too long
1785            debug_log = f'execute program {program!r}'
1786            self._log_subprocess(debug_log, stdin, stdout, stderr)
1787        transport = await self._make_subprocess_transport(
1788            protocol, popen_args, False, stdin, stdout, stderr,
1789            bufsize, **kwargs)
1790        if self._debug and debug_log is not None:
1791            logger.info('%s: %r', debug_log, transport)
1792        return transport, protocol
1793
1794    def get_exception_handler(self):
1795        """Return an exception handler, or None if the default one is in use.
1796        """
1797        return self._exception_handler
1798
1799    def set_exception_handler(self, handler):
1800        """Set handler as the new event loop exception handler.
1801
1802        If handler is None, the default exception handler will
1803        be set.
1804
1805        If handler is a callable object, it should have a
1806        signature matching '(loop, context)', where 'loop'
1807        will be a reference to the active event loop, 'context'
1808        will be a dict object (see `call_exception_handler()`
1809        documentation for details about context).
1810        """
1811        if handler is not None and not callable(handler):
1812            raise TypeError(f'A callable object or None is expected, '
1813                            f'got {handler!r}')
1814        self._exception_handler = handler
1815
1816    def default_exception_handler(self, context):
1817        """Default exception handler.
1818
1819        This is called when an exception occurs and no exception
1820        handler is set, and can be called by a custom exception
1821        handler that wants to defer to the default behavior.
1822
1823        This default handler logs the error message and other
1824        context-dependent information.  In debug mode, a truncated
1825        stack trace is also appended showing where the given object
1826        (e.g. a handle or future or task) was created, if any.
1827
1828        The context parameter has the same meaning as in
1829        `call_exception_handler()`.
1830        """
1831        message = context.get('message')
1832        if not message:
1833            message = 'Unhandled exception in event loop'
1834
1835        exception = context.get('exception')
1836        if exception is not None:
1837            exc_info = (type(exception), exception, exception.__traceback__)
1838        else:
1839            exc_info = False
1840
1841        if ('source_traceback' not in context and
1842                self._current_handle is not None and
1843                self._current_handle._source_traceback):
1844            context['handle_traceback'] = \
1845                self._current_handle._source_traceback
1846
1847        log_lines = [message]
1848        for key in sorted(context):
1849            if key in {'message', 'exception'}:
1850                continue
1851            value = context[key]
1852            if key == 'source_traceback':
1853                tb = ''.join(traceback.format_list(value))
1854                value = 'Object created at (most recent call last):\n'
1855                value += tb.rstrip()
1856            elif key == 'handle_traceback':
1857                tb = ''.join(traceback.format_list(value))
1858                value = 'Handle created at (most recent call last):\n'
1859                value += tb.rstrip()
1860            else:
1861                value = repr(value)
1862            log_lines.append(f'{key}: {value}')
1863
1864        logger.error('\n'.join(log_lines), exc_info=exc_info)
1865
1866    def call_exception_handler(self, context):
1867        """Call the current event loop's exception handler.
1868
1869        The context argument is a dict containing the following keys:
1870
1871        - 'message': Error message;
1872        - 'exception' (optional): Exception object;
1873        - 'future' (optional): Future instance;
1874        - 'task' (optional): Task instance;
1875        - 'handle' (optional): Handle instance;
1876        - 'protocol' (optional): Protocol instance;
1877        - 'transport' (optional): Transport instance;
1878        - 'socket' (optional): Socket instance;
1879        - 'asyncgen' (optional): Asynchronous generator that caused
1880                                 the exception.
1881
1882        New keys maybe introduced in the future.
1883
1884        Note: do not overload this method in an event loop subclass.
1885        For custom exception handling, use the
1886        `set_exception_handler()` method.
1887        """
1888        if self._exception_handler is None:
1889            try:
1890                self.default_exception_handler(context)
1891            except (SystemExit, KeyboardInterrupt):
1892                raise
1893            except BaseException:
1894                # Second protection layer for unexpected errors
1895                # in the default implementation, as well as for subclassed
1896                # event loops with overloaded "default_exception_handler".
1897                logger.error('Exception in default exception handler',
1898                             exc_info=True)
1899        else:
1900            try:
1901                ctx = None
1902                thing = context.get("task")
1903                if thing is None:
1904                    # Even though Futures don't have a context,
1905                    # Task is a subclass of Future,
1906                    # and sometimes the 'future' key holds a Task.
1907                    thing = context.get("future")
1908                if thing is None:
1909                    # Handles also have a context.
1910                    thing = context.get("handle")
1911                if thing is not None and hasattr(thing, "get_context"):
1912                    ctx = thing.get_context()
1913                if ctx is not None and hasattr(ctx, "run"):
1914                    ctx.run(self._exception_handler, self, context)
1915                else:
1916                    self._exception_handler(self, context)
1917            except (SystemExit, KeyboardInterrupt):
1918                raise
1919            except BaseException as exc:
1920                # Exception in the user set custom exception handler.
1921                try:
1922                    # Let's try default handler.
1923                    self.default_exception_handler({
1924                        'message': 'Unhandled error in exception handler',
1925                        'exception': exc,
1926                        'context': context,
1927                    })
1928                except (SystemExit, KeyboardInterrupt):
1929                    raise
1930                except BaseException:
1931                    # Guard 'default_exception_handler' in case it is
1932                    # overloaded.
1933                    logger.error('Exception in default exception handler '
1934                                 'while handling an unexpected error '
1935                                 'in custom exception handler',
1936                                 exc_info=True)
1937
1938    def _add_callback(self, handle):
1939        """Add a Handle to _ready."""
1940        if not handle._cancelled:
1941            self._ready.append(handle)
1942
1943    def _add_callback_signalsafe(self, handle):
1944        """Like _add_callback() but called from a signal handler."""
1945        self._add_callback(handle)
1946        self._write_to_self()
1947
1948    def _timer_handle_cancelled(self, handle):
1949        """Notification that a TimerHandle has been cancelled."""
1950        if handle._scheduled:
1951            self._timer_cancelled_count += 1
1952
1953    def _run_once(self):
1954        """Run one full iteration of the event loop.
1955
1956        This calls all currently ready callbacks, polls for I/O,
1957        schedules the resulting callbacks, and finally schedules
1958        'call_later' callbacks.
1959        """
1960
1961        sched_count = len(self._scheduled)
1962        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1963            self._timer_cancelled_count / sched_count >
1964                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1965            # Remove delayed calls that were cancelled if their number
1966            # is too high
1967            new_scheduled = []
1968            for handle in self._scheduled:
1969                if handle._cancelled:
1970                    handle._scheduled = False
1971                else:
1972                    new_scheduled.append(handle)
1973
1974            heapq.heapify(new_scheduled)
1975            self._scheduled = new_scheduled
1976            self._timer_cancelled_count = 0
1977        else:
1978            # Remove delayed calls that were cancelled from head of queue.
1979            while self._scheduled and self._scheduled[0]._cancelled:
1980                self._timer_cancelled_count -= 1
1981                handle = heapq.heappop(self._scheduled)
1982                handle._scheduled = False
1983
1984        timeout = None
1985        if self._ready or self._stopping:
1986            timeout = 0
1987        elif self._scheduled:
1988            # Compute the desired timeout.
1989            timeout = self._scheduled[0]._when - self.time()
1990            if timeout > MAXIMUM_SELECT_TIMEOUT:
1991                timeout = MAXIMUM_SELECT_TIMEOUT
1992            elif timeout < 0:
1993                timeout = 0
1994
1995        event_list = self._selector.select(timeout)
1996        self._process_events(event_list)
1997        # Needed to break cycles when an exception occurs.
1998        event_list = None
1999
2000        # Handle 'later' callbacks that are ready.
2001        end_time = self.time() + self._clock_resolution
2002        while self._scheduled:
2003            handle = self._scheduled[0]
2004            if handle._when >= end_time:
2005                break
2006            handle = heapq.heappop(self._scheduled)
2007            handle._scheduled = False
2008            self._ready.append(handle)
2009
2010        # This is the only place where callbacks are actually *called*.
2011        # All other places just add them to ready.
2012        # Note: We run all currently scheduled callbacks, but not any
2013        # callbacks scheduled by callbacks run this time around --
2014        # they will be run the next time (after another I/O poll).
2015        # Use an idiom that is thread-safe without using locks.
2016        ntodo = len(self._ready)
2017        for i in range(ntodo):
2018            handle = self._ready.popleft()
2019            if handle._cancelled:
2020                continue
2021            if self._debug:
2022                try:
2023                    self._current_handle = handle
2024                    t0 = self.time()
2025                    handle._run()
2026                    dt = self.time() - t0
2027                    if dt >= self.slow_callback_duration:
2028                        logger.warning('Executing %s took %.3f seconds',
2029                                       _format_handle(handle), dt)
2030                finally:
2031                    self._current_handle = None
2032            else:
2033                handle._run()
2034        handle = None  # Needed to break cycles when an exception occurs.
2035
2036    def _set_coroutine_origin_tracking(self, enabled):
2037        if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
2038            return
2039
2040        if enabled:
2041            self._coroutine_origin_tracking_saved_depth = (
2042                sys.get_coroutine_origin_tracking_depth())
2043            sys.set_coroutine_origin_tracking_depth(
2044                constants.DEBUG_STACK_DEPTH)
2045        else:
2046            sys.set_coroutine_origin_tracking_depth(
2047                self._coroutine_origin_tracking_saved_depth)
2048
2049        self._coroutine_origin_tracking_enabled = enabled
2050
2051    def get_debug(self):
2052        return self._debug
2053
2054    def set_debug(self, enabled):
2055        self._debug = enabled
2056
2057        if self.is_running():
2058            self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)
2059