• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of I/O events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called.  This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16import collections
17import collections.abc
18import concurrent.futures
19import functools
20import heapq
21import itertools
22import os
23import socket
24import stat
25import subprocess
26import threading
27import time
28import traceback
29import sys
30import warnings
31import weakref
32
33try:
34    import ssl
35except ImportError:  # pragma: no cover
36    ssl = None
37
38from . import constants
39from . import coroutines
40from . import events
41from . import exceptions
42from . import futures
43from . import protocols
44from . import sslproto
45from . import staggered
46from . import tasks
47from . import transports
48from . import trsock
49from .log import logger
50
51
52__all__ = 'BaseEventLoop',
53
54
55# Minimum number of _scheduled timer handles before cleanup of
56# cancelled handles is performed.
57_MIN_SCHEDULED_TIMER_HANDLES = 100
58
59# Minimum fraction of _scheduled timer handles that are cancelled
60# before cleanup of cancelled handles is performed.
61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
62
63
64_HAS_IPv6 = hasattr(socket, 'AF_INET6')
65
66# Maximum timeout passed to select to avoid OS limitations
67MAXIMUM_SELECT_TIMEOUT = 24 * 3600
68
69# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s
70# *reuse_address* parameter
71_unset = object()
72
73
74def _format_handle(handle):
75    cb = handle._callback
76    if isinstance(getattr(cb, '__self__', None), tasks.Task):
77        # format the task
78        return repr(cb.__self__)
79    else:
80        return str(handle)
81
82
83def _format_pipe(fd):
84    if fd == subprocess.PIPE:
85        return '<pipe>'
86    elif fd == subprocess.STDOUT:
87        return '<stdout>'
88    else:
89        return repr(fd)
90
91
92def _set_reuseport(sock):
93    if not hasattr(socket, 'SO_REUSEPORT'):
94        raise ValueError('reuse_port not supported by socket module')
95    else:
96        try:
97            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
98        except OSError:
99            raise ValueError('reuse_port not supported by socket module, '
100                             'SO_REUSEPORT defined but not implemented.')
101
102
103def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
104    # Try to skip getaddrinfo if "host" is already an IP. Users might have
105    # handled name resolution in their own code and pass in resolved IPs.
106    if not hasattr(socket, 'inet_pton'):
107        return
108
109    if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
110            host is None:
111        return None
112
113    if type == socket.SOCK_STREAM:
114        proto = socket.IPPROTO_TCP
115    elif type == socket.SOCK_DGRAM:
116        proto = socket.IPPROTO_UDP
117    else:
118        return None
119
120    if port is None:
121        port = 0
122    elif isinstance(port, bytes) and port == b'':
123        port = 0
124    elif isinstance(port, str) and port == '':
125        port = 0
126    else:
127        # If port's a service name like "http", don't skip getaddrinfo.
128        try:
129            port = int(port)
130        except (TypeError, ValueError):
131            return None
132
133    if family == socket.AF_UNSPEC:
134        afs = [socket.AF_INET]
135        if _HAS_IPv6:
136            afs.append(socket.AF_INET6)
137    else:
138        afs = [family]
139
140    if isinstance(host, bytes):
141        host = host.decode('idna')
142    if '%' in host:
143        # Linux's inet_pton doesn't accept an IPv6 zone index after host,
144        # like '::1%lo0'.
145        return None
146
147    for af in afs:
148        try:
149            socket.inet_pton(af, host)
150            # The host has already been resolved.
151            if _HAS_IPv6 and af == socket.AF_INET6:
152                return af, type, proto, '', (host, port, flowinfo, scopeid)
153            else:
154                return af, type, proto, '', (host, port)
155        except OSError:
156            pass
157
158    # "host" is not an IP address.
159    return None
160
161
162def _interleave_addrinfos(addrinfos, first_address_family_count=1):
163    """Interleave list of addrinfo tuples by family."""
164    # Group addresses by family
165    addrinfos_by_family = collections.OrderedDict()
166    for addr in addrinfos:
167        family = addr[0]
168        if family not in addrinfos_by_family:
169            addrinfos_by_family[family] = []
170        addrinfos_by_family[family].append(addr)
171    addrinfos_lists = list(addrinfos_by_family.values())
172
173    reordered = []
174    if first_address_family_count > 1:
175        reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
176        del addrinfos_lists[0][:first_address_family_count - 1]
177    reordered.extend(
178        a for a in itertools.chain.from_iterable(
179            itertools.zip_longest(*addrinfos_lists)
180        ) if a is not None)
181    return reordered
182
183
184def _run_until_complete_cb(fut):
185    if not fut.cancelled():
186        exc = fut.exception()
187        if isinstance(exc, (SystemExit, KeyboardInterrupt)):
188            # Issue #22429: run_forever() already finished, no need to
189            # stop it.
190            return
191    futures._get_loop(fut).stop()
192
193
194if hasattr(socket, 'TCP_NODELAY'):
195    def _set_nodelay(sock):
196        if (sock.family in {socket.AF_INET, socket.AF_INET6} and
197                sock.type == socket.SOCK_STREAM and
198                sock.proto == socket.IPPROTO_TCP):
199            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
200else:
201    def _set_nodelay(sock):
202        pass
203
204
205class _SendfileFallbackProtocol(protocols.Protocol):
206    def __init__(self, transp):
207        if not isinstance(transp, transports._FlowControlMixin):
208            raise TypeError("transport should be _FlowControlMixin instance")
209        self._transport = transp
210        self._proto = transp.get_protocol()
211        self._should_resume_reading = transp.is_reading()
212        self._should_resume_writing = transp._protocol_paused
213        transp.pause_reading()
214        transp.set_protocol(self)
215        if self._should_resume_writing:
216            self._write_ready_fut = self._transport._loop.create_future()
217        else:
218            self._write_ready_fut = None
219
220    async def drain(self):
221        if self._transport.is_closing():
222            raise ConnectionError("Connection closed by peer")
223        fut = self._write_ready_fut
224        if fut is None:
225            return
226        await fut
227
228    def connection_made(self, transport):
229        raise RuntimeError("Invalid state: "
230                           "connection should have been established already.")
231
232    def connection_lost(self, exc):
233        if self._write_ready_fut is not None:
234            # Never happens if peer disconnects after sending the whole content
235            # Thus disconnection is always an exception from user perspective
236            if exc is None:
237                self._write_ready_fut.set_exception(
238                    ConnectionError("Connection is closed by peer"))
239            else:
240                self._write_ready_fut.set_exception(exc)
241        self._proto.connection_lost(exc)
242
243    def pause_writing(self):
244        if self._write_ready_fut is not None:
245            return
246        self._write_ready_fut = self._transport._loop.create_future()
247
248    def resume_writing(self):
249        if self._write_ready_fut is None:
250            return
251        self._write_ready_fut.set_result(False)
252        self._write_ready_fut = None
253
254    def data_received(self, data):
255        raise RuntimeError("Invalid state: reading should be paused")
256
257    def eof_received(self):
258        raise RuntimeError("Invalid state: reading should be paused")
259
260    async def restore(self):
261        self._transport.set_protocol(self._proto)
262        if self._should_resume_reading:
263            self._transport.resume_reading()
264        if self._write_ready_fut is not None:
265            # Cancel the future.
266            # Basically it has no effect because protocol is switched back,
267            # no code should wait for it anymore.
268            self._write_ready_fut.cancel()
269        if self._should_resume_writing:
270            self._proto.resume_writing()
271
272
273class Server(events.AbstractServer):
274
275    def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
276                 ssl_handshake_timeout):
277        self._loop = loop
278        self._sockets = sockets
279        self._active_count = 0
280        self._waiters = []
281        self._protocol_factory = protocol_factory
282        self._backlog = backlog
283        self._ssl_context = ssl_context
284        self._ssl_handshake_timeout = ssl_handshake_timeout
285        self._serving = False
286        self._serving_forever_fut = None
287
288    def __repr__(self):
289        return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
290
291    def _attach(self):
292        assert self._sockets is not None
293        self._active_count += 1
294
295    def _detach(self):
296        assert self._active_count > 0
297        self._active_count -= 1
298        if self._active_count == 0 and self._sockets is None:
299            self._wakeup()
300
301    def _wakeup(self):
302        waiters = self._waiters
303        self._waiters = None
304        for waiter in waiters:
305            if not waiter.done():
306                waiter.set_result(waiter)
307
308    def _start_serving(self):
309        if self._serving:
310            return
311        self._serving = True
312        for sock in self._sockets:
313            sock.listen(self._backlog)
314            self._loop._start_serving(
315                self._protocol_factory, sock, self._ssl_context,
316                self, self._backlog, self._ssl_handshake_timeout)
317
318    def get_loop(self):
319        return self._loop
320
321    def is_serving(self):
322        return self._serving
323
324    @property
325    def sockets(self):
326        if self._sockets is None:
327            return ()
328        return tuple(trsock.TransportSocket(s) for s in self._sockets)
329
330    def close(self):
331        sockets = self._sockets
332        if sockets is None:
333            return
334        self._sockets = None
335
336        for sock in sockets:
337            self._loop._stop_serving(sock)
338
339        self._serving = False
340
341        if (self._serving_forever_fut is not None and
342                not self._serving_forever_fut.done()):
343            self._serving_forever_fut.cancel()
344            self._serving_forever_fut = None
345
346        if self._active_count == 0:
347            self._wakeup()
348
349    async def start_serving(self):
350        self._start_serving()
351        # Skip one loop iteration so that all 'loop.add_reader'
352        # go through.
353        await tasks.sleep(0, loop=self._loop)
354
355    async def serve_forever(self):
356        if self._serving_forever_fut is not None:
357            raise RuntimeError(
358                f'server {self!r} is already being awaited on serve_forever()')
359        if self._sockets is None:
360            raise RuntimeError(f'server {self!r} is closed')
361
362        self._start_serving()
363        self._serving_forever_fut = self._loop.create_future()
364
365        try:
366            await self._serving_forever_fut
367        except exceptions.CancelledError:
368            try:
369                self.close()
370                await self.wait_closed()
371            finally:
372                raise
373        finally:
374            self._serving_forever_fut = None
375
376    async def wait_closed(self):
377        if self._sockets is None or self._waiters is None:
378            return
379        waiter = self._loop.create_future()
380        self._waiters.append(waiter)
381        await waiter
382
383
384class BaseEventLoop(events.AbstractEventLoop):
385
386    def __init__(self):
387        self._timer_cancelled_count = 0
388        self._closed = False
389        self._stopping = False
390        self._ready = collections.deque()
391        self._scheduled = []
392        self._default_executor = None
393        self._internal_fds = 0
394        # Identifier of the thread running the event loop, or None if the
395        # event loop is not running
396        self._thread_id = None
397        self._clock_resolution = time.get_clock_info('monotonic').resolution
398        self._exception_handler = None
399        self.set_debug(coroutines._is_debug_mode())
400        # In debug mode, if the execution of a callback or a step of a task
401        # exceed this duration in seconds, the slow callback/task is logged.
402        self.slow_callback_duration = 0.1
403        self._current_handle = None
404        self._task_factory = None
405        self._coroutine_origin_tracking_enabled = False
406        self._coroutine_origin_tracking_saved_depth = None
407
408        # A weak set of all asynchronous generators that are
409        # being iterated by the loop.
410        self._asyncgens = weakref.WeakSet()
411        # Set to True when `loop.shutdown_asyncgens` is called.
412        self._asyncgens_shutdown_called = False
413
414    def __repr__(self):
415        return (
416            f'<{self.__class__.__name__} running={self.is_running()} '
417            f'closed={self.is_closed()} debug={self.get_debug()}>'
418        )
419
420    def create_future(self):
421        """Create a Future object attached to the loop."""
422        return futures.Future(loop=self)
423
424    def create_task(self, coro, *, name=None):
425        """Schedule a coroutine object.
426
427        Return a task object.
428        """
429        self._check_closed()
430        if self._task_factory is None:
431            task = tasks.Task(coro, loop=self, name=name)
432            if task._source_traceback:
433                del task._source_traceback[-1]
434        else:
435            task = self._task_factory(self, coro)
436            tasks._set_task_name(task, name)
437
438        return task
439
440    def set_task_factory(self, factory):
441        """Set a task factory that will be used by loop.create_task().
442
443        If factory is None the default task factory will be set.
444
445        If factory is a callable, it should have a signature matching
446        '(loop, coro)', where 'loop' will be a reference to the active
447        event loop, 'coro' will be a coroutine object.  The callable
448        must return a Future.
449        """
450        if factory is not None and not callable(factory):
451            raise TypeError('task factory must be a callable or None')
452        self._task_factory = factory
453
454    def get_task_factory(self):
455        """Return a task factory, or None if the default one is in use."""
456        return self._task_factory
457
458    def _make_socket_transport(self, sock, protocol, waiter=None, *,
459                               extra=None, server=None):
460        """Create socket transport."""
461        raise NotImplementedError
462
463    def _make_ssl_transport(
464            self, rawsock, protocol, sslcontext, waiter=None,
465            *, server_side=False, server_hostname=None,
466            extra=None, server=None,
467            ssl_handshake_timeout=None,
468            call_connection_made=True):
469        """Create SSL transport."""
470        raise NotImplementedError
471
472    def _make_datagram_transport(self, sock, protocol,
473                                 address=None, waiter=None, extra=None):
474        """Create datagram transport."""
475        raise NotImplementedError
476
477    def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
478                                  extra=None):
479        """Create read pipe transport."""
480        raise NotImplementedError
481
482    def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
483                                   extra=None):
484        """Create write pipe transport."""
485        raise NotImplementedError
486
487    async def _make_subprocess_transport(self, protocol, args, shell,
488                                         stdin, stdout, stderr, bufsize,
489                                         extra=None, **kwargs):
490        """Create subprocess transport."""
491        raise NotImplementedError
492
493    def _write_to_self(self):
494        """Write a byte to self-pipe, to wake up the event loop.
495
496        This may be called from a different thread.
497
498        The subclass is responsible for implementing the self-pipe.
499        """
500        raise NotImplementedError
501
502    def _process_events(self, event_list):
503        """Process selector events."""
504        raise NotImplementedError
505
506    def _check_closed(self):
507        if self._closed:
508            raise RuntimeError('Event loop is closed')
509
510    def _asyncgen_finalizer_hook(self, agen):
511        self._asyncgens.discard(agen)
512        if not self.is_closed():
513            self.call_soon_threadsafe(self.create_task, agen.aclose())
514
515    def _asyncgen_firstiter_hook(self, agen):
516        if self._asyncgens_shutdown_called:
517            warnings.warn(
518                f"asynchronous generator {agen!r} was scheduled after "
519                f"loop.shutdown_asyncgens() call",
520                ResourceWarning, source=self)
521
522        self._asyncgens.add(agen)
523
524    async def shutdown_asyncgens(self):
525        """Shutdown all active asynchronous generators."""
526        self._asyncgens_shutdown_called = True
527
528        if not len(self._asyncgens):
529            # If Python version is <3.6 or we don't have any asynchronous
530            # generators alive.
531            return
532
533        closing_agens = list(self._asyncgens)
534        self._asyncgens.clear()
535
536        results = await tasks.gather(
537            *[ag.aclose() for ag in closing_agens],
538            return_exceptions=True,
539            loop=self)
540
541        for result, agen in zip(results, closing_agens):
542            if isinstance(result, Exception):
543                self.call_exception_handler({
544                    'message': f'an error occurred during closing of '
545                               f'asynchronous generator {agen!r}',
546                    'exception': result,
547                    'asyncgen': agen
548                })
549
550    def run_forever(self):
551        """Run until stop() is called."""
552        self._check_closed()
553        if self.is_running():
554            raise RuntimeError('This event loop is already running')
555        if events._get_running_loop() is not None:
556            raise RuntimeError(
557                'Cannot run the event loop while another loop is running')
558        self._set_coroutine_origin_tracking(self._debug)
559        self._thread_id = threading.get_ident()
560
561        old_agen_hooks = sys.get_asyncgen_hooks()
562        sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
563                               finalizer=self._asyncgen_finalizer_hook)
564        try:
565            events._set_running_loop(self)
566            while True:
567                self._run_once()
568                if self._stopping:
569                    break
570        finally:
571            self._stopping = False
572            self._thread_id = None
573            events._set_running_loop(None)
574            self._set_coroutine_origin_tracking(False)
575            sys.set_asyncgen_hooks(*old_agen_hooks)
576
577    def run_until_complete(self, future):
578        """Run until the Future is done.
579
580        If the argument is a coroutine, it is wrapped in a Task.
581
582        WARNING: It would be disastrous to call run_until_complete()
583        with the same coroutine twice -- it would wrap it in two
584        different Tasks and that can't be good.
585
586        Return the Future's result, or raise its exception.
587        """
588        self._check_closed()
589
590        new_task = not futures.isfuture(future)
591        future = tasks.ensure_future(future, loop=self)
592        if new_task:
593            # An exception is raised if the future didn't complete, so there
594            # is no need to log the "destroy pending task" message
595            future._log_destroy_pending = False
596
597        future.add_done_callback(_run_until_complete_cb)
598        try:
599            self.run_forever()
600        except:
601            if new_task and future.done() and not future.cancelled():
602                # The coroutine raised a BaseException. Consume the exception
603                # to not log a warning, the caller doesn't have access to the
604                # local task.
605                future.exception()
606            raise
607        finally:
608            future.remove_done_callback(_run_until_complete_cb)
609        if not future.done():
610            raise RuntimeError('Event loop stopped before Future completed.')
611
612        return future.result()
613
614    def stop(self):
615        """Stop running the event loop.
616
617        Every callback already scheduled will still run.  This simply informs
618        run_forever to stop looping after a complete iteration.
619        """
620        self._stopping = True
621
622    def close(self):
623        """Close the event loop.
624
625        This clears the queues and shuts down the executor,
626        but does not wait for the executor to finish.
627
628        The event loop must not be running.
629        """
630        if self.is_running():
631            raise RuntimeError("Cannot close a running event loop")
632        if self._closed:
633            return
634        if self._debug:
635            logger.debug("Close %r", self)
636        self._closed = True
637        self._ready.clear()
638        self._scheduled.clear()
639        executor = self._default_executor
640        if executor is not None:
641            self._default_executor = None
642            executor.shutdown(wait=False)
643
644    def is_closed(self):
645        """Returns True if the event loop was closed."""
646        return self._closed
647
648    def __del__(self, _warn=warnings.warn):
649        if not self.is_closed():
650            _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
651            if not self.is_running():
652                self.close()
653
654    def is_running(self):
655        """Returns True if the event loop is running."""
656        return (self._thread_id is not None)
657
658    def time(self):
659        """Return the time according to the event loop's clock.
660
661        This is a float expressed in seconds since an epoch, but the
662        epoch, precision, accuracy and drift are unspecified and may
663        differ per event loop.
664        """
665        return time.monotonic()
666
667    def call_later(self, delay, callback, *args, context=None):
668        """Arrange for a callback to be called at a given time.
669
670        Return a Handle: an opaque object with a cancel() method that
671        can be used to cancel the call.
672
673        The delay can be an int or float, expressed in seconds.  It is
674        always relative to the current time.
675
676        Each callback will be called exactly once.  If two callbacks
677        are scheduled for exactly the same time, it undefined which
678        will be called first.
679
680        Any positional arguments after the callback will be passed to
681        the callback when it is called.
682        """
683        timer = self.call_at(self.time() + delay, callback, *args,
684                             context=context)
685        if timer._source_traceback:
686            del timer._source_traceback[-1]
687        return timer
688
689    def call_at(self, when, callback, *args, context=None):
690        """Like call_later(), but uses an absolute time.
691
692        Absolute time corresponds to the event loop's time() method.
693        """
694        self._check_closed()
695        if self._debug:
696            self._check_thread()
697            self._check_callback(callback, 'call_at')
698        timer = events.TimerHandle(when, callback, args, self, context)
699        if timer._source_traceback:
700            del timer._source_traceback[-1]
701        heapq.heappush(self._scheduled, timer)
702        timer._scheduled = True
703        return timer
704
705    def call_soon(self, callback, *args, context=None):
706        """Arrange for a callback to be called as soon as possible.
707
708        This operates as a FIFO queue: callbacks are called in the
709        order in which they are registered.  Each callback will be
710        called exactly once.
711
712        Any positional arguments after the callback will be passed to
713        the callback when it is called.
714        """
715        self._check_closed()
716        if self._debug:
717            self._check_thread()
718            self._check_callback(callback, 'call_soon')
719        handle = self._call_soon(callback, args, context)
720        if handle._source_traceback:
721            del handle._source_traceback[-1]
722        return handle
723
724    def _check_callback(self, callback, method):
725        if (coroutines.iscoroutine(callback) or
726                coroutines.iscoroutinefunction(callback)):
727            raise TypeError(
728                f"coroutines cannot be used with {method}()")
729        if not callable(callback):
730            raise TypeError(
731                f'a callable object was expected by {method}(), '
732                f'got {callback!r}')
733
734    def _call_soon(self, callback, args, context):
735        handle = events.Handle(callback, args, self, context)
736        if handle._source_traceback:
737            del handle._source_traceback[-1]
738        self._ready.append(handle)
739        return handle
740
741    def _check_thread(self):
742        """Check that the current thread is the thread running the event loop.
743
744        Non-thread-safe methods of this class make this assumption and will
745        likely behave incorrectly when the assumption is violated.
746
747        Should only be called when (self._debug == True).  The caller is
748        responsible for checking this condition for performance reasons.
749        """
750        if self._thread_id is None:
751            return
752        thread_id = threading.get_ident()
753        if thread_id != self._thread_id:
754            raise RuntimeError(
755                "Non-thread-safe operation invoked on an event loop other "
756                "than the current one")
757
758    def call_soon_threadsafe(self, callback, *args, context=None):
759        """Like call_soon(), but thread-safe."""
760        self._check_closed()
761        if self._debug:
762            self._check_callback(callback, 'call_soon_threadsafe')
763        handle = self._call_soon(callback, args, context)
764        if handle._source_traceback:
765            del handle._source_traceback[-1]
766        self._write_to_self()
767        return handle
768
769    def run_in_executor(self, executor, func, *args):
770        self._check_closed()
771        if self._debug:
772            self._check_callback(func, 'run_in_executor')
773        if executor is None:
774            executor = self._default_executor
775            if executor is None:
776                executor = concurrent.futures.ThreadPoolExecutor()
777                self._default_executor = executor
778        return futures.wrap_future(
779            executor.submit(func, *args), loop=self)
780
781    def set_default_executor(self, executor):
782        if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
783            warnings.warn(
784                'Using the default executor that is not an instance of '
785                'ThreadPoolExecutor is deprecated and will be prohibited '
786                'in Python 3.9',
787                DeprecationWarning, 2)
788        self._default_executor = executor
789
790    def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
791        msg = [f"{host}:{port!r}"]
792        if family:
793            msg.append(f'family={family!r}')
794        if type:
795            msg.append(f'type={type!r}')
796        if proto:
797            msg.append(f'proto={proto!r}')
798        if flags:
799            msg.append(f'flags={flags!r}')
800        msg = ', '.join(msg)
801        logger.debug('Get address info %s', msg)
802
803        t0 = self.time()
804        addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
805        dt = self.time() - t0
806
807        msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
808        if dt >= self.slow_callback_duration:
809            logger.info(msg)
810        else:
811            logger.debug(msg)
812        return addrinfo
813
814    async def getaddrinfo(self, host, port, *,
815                          family=0, type=0, proto=0, flags=0):
816        if self._debug:
817            getaddr_func = self._getaddrinfo_debug
818        else:
819            getaddr_func = socket.getaddrinfo
820
821        return await self.run_in_executor(
822            None, getaddr_func, host, port, family, type, proto, flags)
823
824    async def getnameinfo(self, sockaddr, flags=0):
825        return await self.run_in_executor(
826            None, socket.getnameinfo, sockaddr, flags)
827
828    async def sock_sendfile(self, sock, file, offset=0, count=None,
829                            *, fallback=True):
830        if self._debug and sock.gettimeout() != 0:
831            raise ValueError("the socket must be non-blocking")
832        self._check_sendfile_params(sock, file, offset, count)
833        try:
834            return await self._sock_sendfile_native(sock, file,
835                                                    offset, count)
836        except exceptions.SendfileNotAvailableError as exc:
837            if not fallback:
838                raise
839        return await self._sock_sendfile_fallback(sock, file,
840                                                  offset, count)
841
842    async def _sock_sendfile_native(self, sock, file, offset, count):
843        # NB: sendfile syscall is not supported for SSL sockets and
844        # non-mmap files even if sendfile is supported by OS
845        raise exceptions.SendfileNotAvailableError(
846            f"syscall sendfile is not available for socket {sock!r} "
847            "and file {file!r} combination")
848
849    async def _sock_sendfile_fallback(self, sock, file, offset, count):
850        if offset:
851            file.seek(offset)
852        blocksize = (
853            min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
854            if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
855        )
856        buf = bytearray(blocksize)
857        total_sent = 0
858        try:
859            while True:
860                if count:
861                    blocksize = min(count - total_sent, blocksize)
862                    if blocksize <= 0:
863                        break
864                view = memoryview(buf)[:blocksize]
865                read = await self.run_in_executor(None, file.readinto, view)
866                if not read:
867                    break  # EOF
868                await self.sock_sendall(sock, view[:read])
869                total_sent += read
870            return total_sent
871        finally:
872            if total_sent > 0 and hasattr(file, 'seek'):
873                file.seek(offset + total_sent)
874
875    def _check_sendfile_params(self, sock, file, offset, count):
876        if 'b' not in getattr(file, 'mode', 'b'):
877            raise ValueError("file should be opened in binary mode")
878        if not sock.type == socket.SOCK_STREAM:
879            raise ValueError("only SOCK_STREAM type sockets are supported")
880        if count is not None:
881            if not isinstance(count, int):
882                raise TypeError(
883                    "count must be a positive integer (got {!r})".format(count))
884            if count <= 0:
885                raise ValueError(
886                    "count must be a positive integer (got {!r})".format(count))
887        if not isinstance(offset, int):
888            raise TypeError(
889                "offset must be a non-negative integer (got {!r})".format(
890                    offset))
891        if offset < 0:
892            raise ValueError(
893                "offset must be a non-negative integer (got {!r})".format(
894                    offset))
895
896    async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
897        """Create, bind and connect one socket."""
898        my_exceptions = []
899        exceptions.append(my_exceptions)
900        family, type_, proto, _, address = addr_info
901        sock = None
902        try:
903            sock = socket.socket(family=family, type=type_, proto=proto)
904            sock.setblocking(False)
905            if local_addr_infos is not None:
906                for _, _, _, _, laddr in local_addr_infos:
907                    try:
908                        sock.bind(laddr)
909                        break
910                    except OSError as exc:
911                        msg = (
912                            f'error while attempting to bind on '
913                            f'address {laddr!r}: '
914                            f'{exc.strerror.lower()}'
915                        )
916                        exc = OSError(exc.errno, msg)
917                        my_exceptions.append(exc)
918                else:  # all bind attempts failed
919                    raise my_exceptions.pop()
920            await self.sock_connect(sock, address)
921            return sock
922        except OSError as exc:
923            my_exceptions.append(exc)
924            if sock is not None:
925                sock.close()
926            raise
927        except:
928            if sock is not None:
929                sock.close()
930            raise
931
932    async def create_connection(
933            self, protocol_factory, host=None, port=None,
934            *, ssl=None, family=0,
935            proto=0, flags=0, sock=None,
936            local_addr=None, server_hostname=None,
937            ssl_handshake_timeout=None,
938            happy_eyeballs_delay=None, interleave=None):
939        """Connect to a TCP server.
940
941        Create a streaming transport connection to a given Internet host and
942        port: socket family AF_INET or socket.AF_INET6 depending on host (or
943        family if specified), socket type SOCK_STREAM. protocol_factory must be
944        a callable returning a protocol instance.
945
946        This method is a coroutine which will try to establish the connection
947        in the background.  When successful, the coroutine returns a
948        (transport, protocol) pair.
949        """
950        if server_hostname is not None and not ssl:
951            raise ValueError('server_hostname is only meaningful with ssl')
952
953        if server_hostname is None and ssl:
954            # Use host as default for server_hostname.  It is an error
955            # if host is empty or not set, e.g. when an
956            # already-connected socket was passed or when only a port
957            # is given.  To avoid this error, you can pass
958            # server_hostname='' -- this will bypass the hostname
959            # check.  (This also means that if host is a numeric
960            # IP/IPv6 address, we will attempt to verify that exact
961            # address; this will probably fail, but it is possible to
962            # create a certificate for a specific IP address, so we
963            # don't judge it here.)
964            if not host:
965                raise ValueError('You must set server_hostname '
966                                 'when using ssl without a host')
967            server_hostname = host
968
969        if ssl_handshake_timeout is not None and not ssl:
970            raise ValueError(
971                'ssl_handshake_timeout is only meaningful with ssl')
972
973        if happy_eyeballs_delay is not None and interleave is None:
974            # If using happy eyeballs, default to interleave addresses by family
975            interleave = 1
976
977        if host is not None or port is not None:
978            if sock is not None:
979                raise ValueError(
980                    'host/port and sock can not be specified at the same time')
981
982            infos = await self._ensure_resolved(
983                (host, port), family=family,
984                type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
985            if not infos:
986                raise OSError('getaddrinfo() returned empty list')
987
988            if local_addr is not None:
989                laddr_infos = await self._ensure_resolved(
990                    local_addr, family=family,
991                    type=socket.SOCK_STREAM, proto=proto,
992                    flags=flags, loop=self)
993                if not laddr_infos:
994                    raise OSError('getaddrinfo() returned empty list')
995            else:
996                laddr_infos = None
997
998            if interleave:
999                infos = _interleave_addrinfos(infos, interleave)
1000
1001            exceptions = []
1002            if happy_eyeballs_delay is None:
1003                # not using happy eyeballs
1004                for addrinfo in infos:
1005                    try:
1006                        sock = await self._connect_sock(
1007                            exceptions, addrinfo, laddr_infos)
1008                        break
1009                    except OSError:
1010                        continue
1011            else:  # using happy eyeballs
1012                sock, _, _ = await staggered.staggered_race(
1013                    (functools.partial(self._connect_sock,
1014                                       exceptions, addrinfo, laddr_infos)
1015                     for addrinfo in infos),
1016                    happy_eyeballs_delay, loop=self)
1017
1018            if sock is None:
1019                exceptions = [exc for sub in exceptions for exc in sub]
1020                if len(exceptions) == 1:
1021                    raise exceptions[0]
1022                else:
1023                    # If they all have the same str(), raise one.
1024                    model = str(exceptions[0])
1025                    if all(str(exc) == model for exc in exceptions):
1026                        raise exceptions[0]
1027                    # Raise a combined exception so the user can see all
1028                    # the various error messages.
1029                    raise OSError('Multiple exceptions: {}'.format(
1030                        ', '.join(str(exc) for exc in exceptions)))
1031
1032        else:
1033            if sock is None:
1034                raise ValueError(
1035                    'host and port was not specified and no sock specified')
1036            if sock.type != socket.SOCK_STREAM:
1037                # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1038                # are SOCK_STREAM.
1039                # We support passing AF_UNIX sockets even though we have
1040                # a dedicated API for that: create_unix_connection.
1041                # Disallowing AF_UNIX in this method, breaks backwards
1042                # compatibility.
1043                raise ValueError(
1044                    f'A Stream Socket was expected, got {sock!r}')
1045
1046        transport, protocol = await self._create_connection_transport(
1047            sock, protocol_factory, ssl, server_hostname,
1048            ssl_handshake_timeout=ssl_handshake_timeout)
1049        if self._debug:
1050            # Get the socket from the transport because SSL transport closes
1051            # the old socket and creates a new SSL socket
1052            sock = transport.get_extra_info('socket')
1053            logger.debug("%r connected to %s:%r: (%r, %r)",
1054                         sock, host, port, transport, protocol)
1055        return transport, protocol
1056
1057    async def _create_connection_transport(
1058            self, sock, protocol_factory, ssl,
1059            server_hostname, server_side=False,
1060            ssl_handshake_timeout=None):
1061
1062        sock.setblocking(False)
1063
1064        protocol = protocol_factory()
1065        waiter = self.create_future()
1066        if ssl:
1067            sslcontext = None if isinstance(ssl, bool) else ssl
1068            transport = self._make_ssl_transport(
1069                sock, protocol, sslcontext, waiter,
1070                server_side=server_side, server_hostname=server_hostname,
1071                ssl_handshake_timeout=ssl_handshake_timeout)
1072        else:
1073            transport = self._make_socket_transport(sock, protocol, waiter)
1074
1075        try:
1076            await waiter
1077        except:
1078            transport.close()
1079            raise
1080
1081        return transport, protocol
1082
1083    async def sendfile(self, transport, file, offset=0, count=None,
1084                       *, fallback=True):
1085        """Send a file to transport.
1086
1087        Return the total number of bytes which were sent.
1088
1089        The method uses high-performance os.sendfile if available.
1090
1091        file must be a regular file object opened in binary mode.
1092
1093        offset tells from where to start reading the file. If specified,
1094        count is the total number of bytes to transmit as opposed to
1095        sending the file until EOF is reached. File position is updated on
1096        return or also in case of error in which case file.tell()
1097        can be used to figure out the number of bytes
1098        which were sent.
1099
1100        fallback set to True makes asyncio to manually read and send
1101        the file when the platform does not support the sendfile syscall
1102        (e.g. Windows or SSL socket on Unix).
1103
1104        Raise SendfileNotAvailableError if the system does not support
1105        sendfile syscall and fallback is False.
1106        """
1107        if transport.is_closing():
1108            raise RuntimeError("Transport is closing")
1109        mode = getattr(transport, '_sendfile_compatible',
1110                       constants._SendfileMode.UNSUPPORTED)
1111        if mode is constants._SendfileMode.UNSUPPORTED:
1112            raise RuntimeError(
1113                f"sendfile is not supported for transport {transport!r}")
1114        if mode is constants._SendfileMode.TRY_NATIVE:
1115            try:
1116                return await self._sendfile_native(transport, file,
1117                                                   offset, count)
1118            except exceptions.SendfileNotAvailableError as exc:
1119                if not fallback:
1120                    raise
1121
1122        if not fallback:
1123            raise RuntimeError(
1124                f"fallback is disabled and native sendfile is not "
1125                f"supported for transport {transport!r}")
1126
1127        return await self._sendfile_fallback(transport, file,
1128                                             offset, count)
1129
1130    async def _sendfile_native(self, transp, file, offset, count):
1131        raise exceptions.SendfileNotAvailableError(
1132            "sendfile syscall is not supported")
1133
1134    async def _sendfile_fallback(self, transp, file, offset, count):
1135        if offset:
1136            file.seek(offset)
1137        blocksize = min(count, 16384) if count else 16384
1138        buf = bytearray(blocksize)
1139        total_sent = 0
1140        proto = _SendfileFallbackProtocol(transp)
1141        try:
1142            while True:
1143                if count:
1144                    blocksize = min(count - total_sent, blocksize)
1145                    if blocksize <= 0:
1146                        return total_sent
1147                view = memoryview(buf)[:blocksize]
1148                read = await self.run_in_executor(None, file.readinto, view)
1149                if not read:
1150                    return total_sent  # EOF
1151                await proto.drain()
1152                transp.write(view[:read])
1153                total_sent += read
1154        finally:
1155            if total_sent > 0 and hasattr(file, 'seek'):
1156                file.seek(offset + total_sent)
1157            await proto.restore()
1158
1159    async def start_tls(self, transport, protocol, sslcontext, *,
1160                        server_side=False,
1161                        server_hostname=None,
1162                        ssl_handshake_timeout=None):
1163        """Upgrade transport to TLS.
1164
1165        Return a new transport that *protocol* should start using
1166        immediately.
1167        """
1168        if ssl is None:
1169            raise RuntimeError('Python ssl module is not available')
1170
1171        if not isinstance(sslcontext, ssl.SSLContext):
1172            raise TypeError(
1173                f'sslcontext is expected to be an instance of ssl.SSLContext, '
1174                f'got {sslcontext!r}')
1175
1176        if not getattr(transport, '_start_tls_compatible', False):
1177            raise TypeError(
1178                f'transport {transport!r} is not supported by start_tls()')
1179
1180        waiter = self.create_future()
1181        ssl_protocol = sslproto.SSLProtocol(
1182            self, protocol, sslcontext, waiter,
1183            server_side, server_hostname,
1184            ssl_handshake_timeout=ssl_handshake_timeout,
1185            call_connection_made=False)
1186
1187        # Pause early so that "ssl_protocol.data_received()" doesn't
1188        # have a chance to get called before "ssl_protocol.connection_made()".
1189        transport.pause_reading()
1190
1191        transport.set_protocol(ssl_protocol)
1192        conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1193        resume_cb = self.call_soon(transport.resume_reading)
1194
1195        try:
1196            await waiter
1197        except BaseException:
1198            transport.close()
1199            conmade_cb.cancel()
1200            resume_cb.cancel()
1201            raise
1202
1203        return ssl_protocol._app_transport
1204
1205    async def create_datagram_endpoint(self, protocol_factory,
1206                                       local_addr=None, remote_addr=None, *,
1207                                       family=0, proto=0, flags=0,
1208                                       reuse_address=_unset, reuse_port=None,
1209                                       allow_broadcast=None, sock=None):
1210        """Create datagram connection."""
1211        if sock is not None:
1212            if sock.type != socket.SOCK_DGRAM:
1213                raise ValueError(
1214                    f'A UDP Socket was expected, got {sock!r}')
1215            if (local_addr or remote_addr or
1216                    family or proto or flags or
1217                    reuse_port or allow_broadcast):
1218                # show the problematic kwargs in exception msg
1219                opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1220                            family=family, proto=proto, flags=flags,
1221                            reuse_address=reuse_address, reuse_port=reuse_port,
1222                            allow_broadcast=allow_broadcast)
1223                problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
1224                raise ValueError(
1225                    f'socket modifier keyword arguments can not be used '
1226                    f'when sock is specified. ({problems})')
1227            sock.setblocking(False)
1228            r_addr = None
1229        else:
1230            if not (local_addr or remote_addr):
1231                if family == 0:
1232                    raise ValueError('unexpected address family')
1233                addr_pairs_info = (((family, proto), (None, None)),)
1234            elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1235                for addr in (local_addr, remote_addr):
1236                    if addr is not None and not isinstance(addr, str):
1237                        raise TypeError('string is expected')
1238
1239                if local_addr and local_addr[0] not in (0, '\x00'):
1240                    try:
1241                        if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1242                            os.remove(local_addr)
1243                    except FileNotFoundError:
1244                        pass
1245                    except OSError as err:
1246                        # Directory may have permissions only to create socket.
1247                        logger.error('Unable to check or remove stale UNIX '
1248                                     'socket %r: %r',
1249                                     local_addr, err)
1250
1251                addr_pairs_info = (((family, proto),
1252                                    (local_addr, remote_addr)), )
1253            else:
1254                # join address by (family, protocol)
1255                addr_infos = {}  # Using order preserving dict
1256                for idx, addr in ((0, local_addr), (1, remote_addr)):
1257                    if addr is not None:
1258                        assert isinstance(addr, tuple) and len(addr) == 2, (
1259                            '2-tuple is expected')
1260
1261                        infos = await self._ensure_resolved(
1262                            addr, family=family, type=socket.SOCK_DGRAM,
1263                            proto=proto, flags=flags, loop=self)
1264                        if not infos:
1265                            raise OSError('getaddrinfo() returned empty list')
1266
1267                        for fam, _, pro, _, address in infos:
1268                            key = (fam, pro)
1269                            if key not in addr_infos:
1270                                addr_infos[key] = [None, None]
1271                            addr_infos[key][idx] = address
1272
1273                # each addr has to have info for each (family, proto) pair
1274                addr_pairs_info = [
1275                    (key, addr_pair) for key, addr_pair in addr_infos.items()
1276                    if not ((local_addr and addr_pair[0] is None) or
1277                            (remote_addr and addr_pair[1] is None))]
1278
1279                if not addr_pairs_info:
1280                    raise ValueError('can not get address information')
1281
1282            exceptions = []
1283
1284            # bpo-37228
1285            if reuse_address is not _unset:
1286                if reuse_address:
1287                    raise ValueError("Passing `reuse_address=True` is no "
1288                                     "longer supported, as the usage of "
1289                                     "SO_REUSEPORT in UDP poses a significant "
1290                                     "security concern.")
1291                else:
1292                    warnings.warn("The *reuse_address* parameter has been "
1293                                  "deprecated as of 3.5.10 and is scheduled "
1294                                  "for removal in 3.11.", DeprecationWarning,
1295                                  stacklevel=2)
1296
1297            for ((family, proto),
1298                 (local_address, remote_address)) in addr_pairs_info:
1299                sock = None
1300                r_addr = None
1301                try:
1302                    sock = socket.socket(
1303                        family=family, type=socket.SOCK_DGRAM, proto=proto)
1304                    if reuse_port:
1305                        _set_reuseport(sock)
1306                    if allow_broadcast:
1307                        sock.setsockopt(
1308                            socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1309                    sock.setblocking(False)
1310
1311                    if local_addr:
1312                        sock.bind(local_address)
1313                    if remote_addr:
1314                        if not allow_broadcast:
1315                            await self.sock_connect(sock, remote_address)
1316                        r_addr = remote_address
1317                except OSError as exc:
1318                    if sock is not None:
1319                        sock.close()
1320                    exceptions.append(exc)
1321                except:
1322                    if sock is not None:
1323                        sock.close()
1324                    raise
1325                else:
1326                    break
1327            else:
1328                raise exceptions[0]
1329
1330        protocol = protocol_factory()
1331        waiter = self.create_future()
1332        transport = self._make_datagram_transport(
1333            sock, protocol, r_addr, waiter)
1334        if self._debug:
1335            if local_addr:
1336                logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1337                            "created: (%r, %r)",
1338                            local_addr, remote_addr, transport, protocol)
1339            else:
1340                logger.debug("Datagram endpoint remote_addr=%r created: "
1341                             "(%r, %r)",
1342                             remote_addr, transport, protocol)
1343
1344        try:
1345            await waiter
1346        except:
1347            transport.close()
1348            raise
1349
1350        return transport, protocol
1351
1352    async def _ensure_resolved(self, address, *,
1353                               family=0, type=socket.SOCK_STREAM,
1354                               proto=0, flags=0, loop):
1355        host, port = address[:2]
1356        info = _ipaddr_info(host, port, family, type, proto, *address[2:])
1357        if info is not None:
1358            # "host" is already a resolved IP.
1359            return [info]
1360        else:
1361            return await loop.getaddrinfo(host, port, family=family, type=type,
1362                                          proto=proto, flags=flags)
1363
1364    async def _create_server_getaddrinfo(self, host, port, family, flags):
1365        infos = await self._ensure_resolved((host, port), family=family,
1366                                            type=socket.SOCK_STREAM,
1367                                            flags=flags, loop=self)
1368        if not infos:
1369            raise OSError(f'getaddrinfo({host!r}) returned empty list')
1370        return infos
1371
1372    async def create_server(
1373            self, protocol_factory, host=None, port=None,
1374            *,
1375            family=socket.AF_UNSPEC,
1376            flags=socket.AI_PASSIVE,
1377            sock=None,
1378            backlog=100,
1379            ssl=None,
1380            reuse_address=None,
1381            reuse_port=None,
1382            ssl_handshake_timeout=None,
1383            start_serving=True):
1384        """Create a TCP server.
1385
1386        The host parameter can be a string, in that case the TCP server is
1387        bound to host and port.
1388
1389        The host parameter can also be a sequence of strings and in that case
1390        the TCP server is bound to all hosts of the sequence. If a host
1391        appears multiple times (possibly indirectly e.g. when hostnames
1392        resolve to the same IP address), the server is only bound once to that
1393        host.
1394
1395        Return a Server object which can be used to stop the service.
1396
1397        This method is a coroutine.
1398        """
1399        if isinstance(ssl, bool):
1400            raise TypeError('ssl argument must be an SSLContext or None')
1401
1402        if ssl_handshake_timeout is not None and ssl is None:
1403            raise ValueError(
1404                'ssl_handshake_timeout is only meaningful with ssl')
1405
1406        if host is not None or port is not None:
1407            if sock is not None:
1408                raise ValueError(
1409                    'host/port and sock can not be specified at the same time')
1410
1411            if reuse_address is None:
1412                reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1413            sockets = []
1414            if host == '':
1415                hosts = [None]
1416            elif (isinstance(host, str) or
1417                  not isinstance(host, collections.abc.Iterable)):
1418                hosts = [host]
1419            else:
1420                hosts = host
1421
1422            fs = [self._create_server_getaddrinfo(host, port, family=family,
1423                                                  flags=flags)
1424                  for host in hosts]
1425            infos = await tasks.gather(*fs, loop=self)
1426            infos = set(itertools.chain.from_iterable(infos))
1427
1428            completed = False
1429            try:
1430                for res in infos:
1431                    af, socktype, proto, canonname, sa = res
1432                    try:
1433                        sock = socket.socket(af, socktype, proto)
1434                    except socket.error:
1435                        # Assume it's a bad family/type/protocol combination.
1436                        if self._debug:
1437                            logger.warning('create_server() failed to create '
1438                                           'socket.socket(%r, %r, %r)',
1439                                           af, socktype, proto, exc_info=True)
1440                        continue
1441                    sockets.append(sock)
1442                    if reuse_address:
1443                        sock.setsockopt(
1444                            socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1445                    if reuse_port:
1446                        _set_reuseport(sock)
1447                    # Disable IPv4/IPv6 dual stack support (enabled by
1448                    # default on Linux) which makes a single socket
1449                    # listen on both address families.
1450                    if (_HAS_IPv6 and
1451                            af == socket.AF_INET6 and
1452                            hasattr(socket, 'IPPROTO_IPV6')):
1453                        sock.setsockopt(socket.IPPROTO_IPV6,
1454                                        socket.IPV6_V6ONLY,
1455                                        True)
1456                    try:
1457                        sock.bind(sa)
1458                    except OSError as err:
1459                        raise OSError(err.errno, 'error while attempting '
1460                                      'to bind on address %r: %s'
1461                                      % (sa, err.strerror.lower())) from None
1462                completed = True
1463            finally:
1464                if not completed:
1465                    for sock in sockets:
1466                        sock.close()
1467        else:
1468            if sock is None:
1469                raise ValueError('Neither host/port nor sock were specified')
1470            if sock.type != socket.SOCK_STREAM:
1471                raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1472            sockets = [sock]
1473
1474        for sock in sockets:
1475            sock.setblocking(False)
1476
1477        server = Server(self, sockets, protocol_factory,
1478                        ssl, backlog, ssl_handshake_timeout)
1479        if start_serving:
1480            server._start_serving()
1481            # Skip one loop iteration so that all 'loop.add_reader'
1482            # go through.
1483            await tasks.sleep(0, loop=self)
1484
1485        if self._debug:
1486            logger.info("%r is serving", server)
1487        return server
1488
1489    async def connect_accepted_socket(
1490            self, protocol_factory, sock,
1491            *, ssl=None,
1492            ssl_handshake_timeout=None):
1493        """Handle an accepted connection.
1494
1495        This is used by servers that accept connections outside of
1496        asyncio but that use asyncio to handle connections.
1497
1498        This method is a coroutine.  When completed, the coroutine
1499        returns a (transport, protocol) pair.
1500        """
1501        if sock.type != socket.SOCK_STREAM:
1502            raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1503
1504        if ssl_handshake_timeout is not None and not ssl:
1505            raise ValueError(
1506                'ssl_handshake_timeout is only meaningful with ssl')
1507
1508        transport, protocol = await self._create_connection_transport(
1509            sock, protocol_factory, ssl, '', server_side=True,
1510            ssl_handshake_timeout=ssl_handshake_timeout)
1511        if self._debug:
1512            # Get the socket from the transport because SSL transport closes
1513            # the old socket and creates a new SSL socket
1514            sock = transport.get_extra_info('socket')
1515            logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1516        return transport, protocol
1517
1518    async def connect_read_pipe(self, protocol_factory, pipe):
1519        protocol = protocol_factory()
1520        waiter = self.create_future()
1521        transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1522
1523        try:
1524            await waiter
1525        except:
1526            transport.close()
1527            raise
1528
1529        if self._debug:
1530            logger.debug('Read pipe %r connected: (%r, %r)',
1531                         pipe.fileno(), transport, protocol)
1532        return transport, protocol
1533
1534    async def connect_write_pipe(self, protocol_factory, pipe):
1535        protocol = protocol_factory()
1536        waiter = self.create_future()
1537        transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1538
1539        try:
1540            await waiter
1541        except:
1542            transport.close()
1543            raise
1544
1545        if self._debug:
1546            logger.debug('Write pipe %r connected: (%r, %r)',
1547                         pipe.fileno(), transport, protocol)
1548        return transport, protocol
1549
1550    def _log_subprocess(self, msg, stdin, stdout, stderr):
1551        info = [msg]
1552        if stdin is not None:
1553            info.append(f'stdin={_format_pipe(stdin)}')
1554        if stdout is not None and stderr == subprocess.STDOUT:
1555            info.append(f'stdout=stderr={_format_pipe(stdout)}')
1556        else:
1557            if stdout is not None:
1558                info.append(f'stdout={_format_pipe(stdout)}')
1559            if stderr is not None:
1560                info.append(f'stderr={_format_pipe(stderr)}')
1561        logger.debug(' '.join(info))
1562
1563    async def subprocess_shell(self, protocol_factory, cmd, *,
1564                               stdin=subprocess.PIPE,
1565                               stdout=subprocess.PIPE,
1566                               stderr=subprocess.PIPE,
1567                               universal_newlines=False,
1568                               shell=True, bufsize=0,
1569                               encoding=None, errors=None, text=None,
1570                               **kwargs):
1571        if not isinstance(cmd, (bytes, str)):
1572            raise ValueError("cmd must be a string")
1573        if universal_newlines:
1574            raise ValueError("universal_newlines must be False")
1575        if not shell:
1576            raise ValueError("shell must be True")
1577        if bufsize != 0:
1578            raise ValueError("bufsize must be 0")
1579        if text:
1580            raise ValueError("text must be False")
1581        if encoding is not None:
1582            raise ValueError("encoding must be None")
1583        if errors is not None:
1584            raise ValueError("errors must be None")
1585
1586        protocol = protocol_factory()
1587        debug_log = None
1588        if self._debug:
1589            # don't log parameters: they may contain sensitive information
1590            # (password) and may be too long
1591            debug_log = 'run shell command %r' % cmd
1592            self._log_subprocess(debug_log, stdin, stdout, stderr)
1593        transport = await self._make_subprocess_transport(
1594            protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1595        if self._debug and debug_log is not None:
1596            logger.info('%s: %r', debug_log, transport)
1597        return transport, protocol
1598
1599    async def subprocess_exec(self, protocol_factory, program, *args,
1600                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1601                              stderr=subprocess.PIPE, universal_newlines=False,
1602                              shell=False, bufsize=0,
1603                              encoding=None, errors=None, text=None,
1604                              **kwargs):
1605        if universal_newlines:
1606            raise ValueError("universal_newlines must be False")
1607        if shell:
1608            raise ValueError("shell must be False")
1609        if bufsize != 0:
1610            raise ValueError("bufsize must be 0")
1611        if text:
1612            raise ValueError("text must be False")
1613        if encoding is not None:
1614            raise ValueError("encoding must be None")
1615        if errors is not None:
1616            raise ValueError("errors must be None")
1617
1618        popen_args = (program,) + args
1619        protocol = protocol_factory()
1620        debug_log = None
1621        if self._debug:
1622            # don't log parameters: they may contain sensitive information
1623            # (password) and may be too long
1624            debug_log = f'execute program {program!r}'
1625            self._log_subprocess(debug_log, stdin, stdout, stderr)
1626        transport = await self._make_subprocess_transport(
1627            protocol, popen_args, False, stdin, stdout, stderr,
1628            bufsize, **kwargs)
1629        if self._debug and debug_log is not None:
1630            logger.info('%s: %r', debug_log, transport)
1631        return transport, protocol
1632
1633    def get_exception_handler(self):
1634        """Return an exception handler, or None if the default one is in use.
1635        """
1636        return self._exception_handler
1637
1638    def set_exception_handler(self, handler):
1639        """Set handler as the new event loop exception handler.
1640
1641        If handler is None, the default exception handler will
1642        be set.
1643
1644        If handler is a callable object, it should have a
1645        signature matching '(loop, context)', where 'loop'
1646        will be a reference to the active event loop, 'context'
1647        will be a dict object (see `call_exception_handler()`
1648        documentation for details about context).
1649        """
1650        if handler is not None and not callable(handler):
1651            raise TypeError(f'A callable object or None is expected, '
1652                            f'got {handler!r}')
1653        self._exception_handler = handler
1654
1655    def default_exception_handler(self, context):
1656        """Default exception handler.
1657
1658        This is called when an exception occurs and no exception
1659        handler is set, and can be called by a custom exception
1660        handler that wants to defer to the default behavior.
1661
1662        This default handler logs the error message and other
1663        context-dependent information.  In debug mode, a truncated
1664        stack trace is also appended showing where the given object
1665        (e.g. a handle or future or task) was created, if any.
1666
1667        The context parameter has the same meaning as in
1668        `call_exception_handler()`.
1669        """
1670        message = context.get('message')
1671        if not message:
1672            message = 'Unhandled exception in event loop'
1673
1674        exception = context.get('exception')
1675        if exception is not None:
1676            exc_info = (type(exception), exception, exception.__traceback__)
1677        else:
1678            exc_info = False
1679
1680        if ('source_traceback' not in context and
1681                self._current_handle is not None and
1682                self._current_handle._source_traceback):
1683            context['handle_traceback'] = \
1684                self._current_handle._source_traceback
1685
1686        log_lines = [message]
1687        for key in sorted(context):
1688            if key in {'message', 'exception'}:
1689                continue
1690            value = context[key]
1691            if key == 'source_traceback':
1692                tb = ''.join(traceback.format_list(value))
1693                value = 'Object created at (most recent call last):\n'
1694                value += tb.rstrip()
1695            elif key == 'handle_traceback':
1696                tb = ''.join(traceback.format_list(value))
1697                value = 'Handle created at (most recent call last):\n'
1698                value += tb.rstrip()
1699            else:
1700                value = repr(value)
1701            log_lines.append(f'{key}: {value}')
1702
1703        logger.error('\n'.join(log_lines), exc_info=exc_info)
1704
1705    def call_exception_handler(self, context):
1706        """Call the current event loop's exception handler.
1707
1708        The context argument is a dict containing the following keys:
1709
1710        - 'message': Error message;
1711        - 'exception' (optional): Exception object;
1712        - 'future' (optional): Future instance;
1713        - 'task' (optional): Task instance;
1714        - 'handle' (optional): Handle instance;
1715        - 'protocol' (optional): Protocol instance;
1716        - 'transport' (optional): Transport instance;
1717        - 'socket' (optional): Socket instance;
1718        - 'asyncgen' (optional): Asynchronous generator that caused
1719                                 the exception.
1720
1721        New keys maybe introduced in the future.
1722
1723        Note: do not overload this method in an event loop subclass.
1724        For custom exception handling, use the
1725        `set_exception_handler()` method.
1726        """
1727        if self._exception_handler is None:
1728            try:
1729                self.default_exception_handler(context)
1730            except (SystemExit, KeyboardInterrupt):
1731                raise
1732            except BaseException:
1733                # Second protection layer for unexpected errors
1734                # in the default implementation, as well as for subclassed
1735                # event loops with overloaded "default_exception_handler".
1736                logger.error('Exception in default exception handler',
1737                             exc_info=True)
1738        else:
1739            try:
1740                self._exception_handler(self, context)
1741            except (SystemExit, KeyboardInterrupt):
1742                raise
1743            except BaseException as exc:
1744                # Exception in the user set custom exception handler.
1745                try:
1746                    # Let's try default handler.
1747                    self.default_exception_handler({
1748                        'message': 'Unhandled error in exception handler',
1749                        'exception': exc,
1750                        'context': context,
1751                    })
1752                except (SystemExit, KeyboardInterrupt):
1753                    raise
1754                except BaseException:
1755                    # Guard 'default_exception_handler' in case it is
1756                    # overloaded.
1757                    logger.error('Exception in default exception handler '
1758                                 'while handling an unexpected error '
1759                                 'in custom exception handler',
1760                                 exc_info=True)
1761
1762    def _add_callback(self, handle):
1763        """Add a Handle to _scheduled (TimerHandle) or _ready."""
1764        assert isinstance(handle, events.Handle), 'A Handle is required here'
1765        if handle._cancelled:
1766            return
1767        assert not isinstance(handle, events.TimerHandle)
1768        self._ready.append(handle)
1769
1770    def _add_callback_signalsafe(self, handle):
1771        """Like _add_callback() but called from a signal handler."""
1772        self._add_callback(handle)
1773        self._write_to_self()
1774
1775    def _timer_handle_cancelled(self, handle):
1776        """Notification that a TimerHandle has been cancelled."""
1777        if handle._scheduled:
1778            self._timer_cancelled_count += 1
1779
1780    def _run_once(self):
1781        """Run one full iteration of the event loop.
1782
1783        This calls all currently ready callbacks, polls for I/O,
1784        schedules the resulting callbacks, and finally schedules
1785        'call_later' callbacks.
1786        """
1787
1788        sched_count = len(self._scheduled)
1789        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1790            self._timer_cancelled_count / sched_count >
1791                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1792            # Remove delayed calls that were cancelled if their number
1793            # is too high
1794            new_scheduled = []
1795            for handle in self._scheduled:
1796                if handle._cancelled:
1797                    handle._scheduled = False
1798                else:
1799                    new_scheduled.append(handle)
1800
1801            heapq.heapify(new_scheduled)
1802            self._scheduled = new_scheduled
1803            self._timer_cancelled_count = 0
1804        else:
1805            # Remove delayed calls that were cancelled from head of queue.
1806            while self._scheduled and self._scheduled[0]._cancelled:
1807                self._timer_cancelled_count -= 1
1808                handle = heapq.heappop(self._scheduled)
1809                handle._scheduled = False
1810
1811        timeout = None
1812        if self._ready or self._stopping:
1813            timeout = 0
1814        elif self._scheduled:
1815            # Compute the desired timeout.
1816            when = self._scheduled[0]._when
1817            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
1818
1819        event_list = self._selector.select(timeout)
1820        self._process_events(event_list)
1821
1822        # Handle 'later' callbacks that are ready.
1823        end_time = self.time() + self._clock_resolution
1824        while self._scheduled:
1825            handle = self._scheduled[0]
1826            if handle._when >= end_time:
1827                break
1828            handle = heapq.heappop(self._scheduled)
1829            handle._scheduled = False
1830            self._ready.append(handle)
1831
1832        # This is the only place where callbacks are actually *called*.
1833        # All other places just add them to ready.
1834        # Note: We run all currently scheduled callbacks, but not any
1835        # callbacks scheduled by callbacks run this time around --
1836        # they will be run the next time (after another I/O poll).
1837        # Use an idiom that is thread-safe without using locks.
1838        ntodo = len(self._ready)
1839        for i in range(ntodo):
1840            handle = self._ready.popleft()
1841            if handle._cancelled:
1842                continue
1843            if self._debug:
1844                try:
1845                    self._current_handle = handle
1846                    t0 = self.time()
1847                    handle._run()
1848                    dt = self.time() - t0
1849                    if dt >= self.slow_callback_duration:
1850                        logger.warning('Executing %s took %.3f seconds',
1851                                       _format_handle(handle), dt)
1852                finally:
1853                    self._current_handle = None
1854            else:
1855                handle._run()
1856        handle = None  # Needed to break cycles when an exception occurs.
1857
1858    def _set_coroutine_origin_tracking(self, enabled):
1859        if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
1860            return
1861
1862        if enabled:
1863            self._coroutine_origin_tracking_saved_depth = (
1864                sys.get_coroutine_origin_tracking_depth())
1865            sys.set_coroutine_origin_tracking_depth(
1866                constants.DEBUG_STACK_DEPTH)
1867        else:
1868            sys.set_coroutine_origin_tracking_depth(
1869                self._coroutine_origin_tracking_saved_depth)
1870
1871        self._coroutine_origin_tracking_enabled = enabled
1872
1873    def get_debug(self):
1874        return self._debug
1875
1876    def set_debug(self, enabled):
1877        self._debug = enabled
1878
1879        if self.is_running():
1880            self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)
1881