• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of I/O events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called.  This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16import collections
17import concurrent.futures
18import heapq
19import inspect
20import itertools
21import logging
22import os
23import socket
24import subprocess
25import threading
26import time
27import traceback
28import sys
29import warnings
30import weakref
31
32from . import compat
33from . import coroutines
34from . import events
35from . import futures
36from . import tasks
37from .coroutines import coroutine
38from .log import logger
39
40
41__all__ = ['BaseEventLoop']
42
43
44# Minimum number of _scheduled timer handles before cleanup of
45# cancelled handles is performed.
46_MIN_SCHEDULED_TIMER_HANDLES = 100
47
48# Minimum fraction of _scheduled timer handles that are cancelled
49# before cleanup of cancelled handles is performed.
50_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
51
52# Exceptions which must not call the exception handler in fatal error
53# methods (_fatal_error())
54_FATAL_ERROR_IGNORE = (BrokenPipeError,
55                       ConnectionResetError, ConnectionAbortedError)
56
57
58def _format_handle(handle):
59    cb = handle._callback
60    if isinstance(getattr(cb, '__self__', None), tasks.Task):
61        # format the task
62        return repr(cb.__self__)
63    else:
64        return str(handle)
65
66
67def _format_pipe(fd):
68    if fd == subprocess.PIPE:
69        return '<pipe>'
70    elif fd == subprocess.STDOUT:
71        return '<stdout>'
72    else:
73        return repr(fd)
74
75
76def _set_reuseport(sock):
77    if not hasattr(socket, 'SO_REUSEPORT'):
78        raise ValueError('reuse_port not supported by socket module')
79    else:
80        try:
81            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
82        except OSError:
83            raise ValueError('reuse_port not supported by socket module, '
84                             'SO_REUSEPORT defined but not implemented.')
85
86
87def _is_stream_socket(sock):
88    # Linux's socket.type is a bitmask that can include extra info
89    # about socket, therefore we can't do simple
90    # `sock_type == socket.SOCK_STREAM`.
91    return (sock.type & socket.SOCK_STREAM) == socket.SOCK_STREAM
92
93
94def _is_dgram_socket(sock):
95    # Linux's socket.type is a bitmask that can include extra info
96    # about socket, therefore we can't do simple
97    # `sock_type == socket.SOCK_DGRAM`.
98    return (sock.type & socket.SOCK_DGRAM) == socket.SOCK_DGRAM
99
100
101def _ipaddr_info(host, port, family, type, proto):
102    # Try to skip getaddrinfo if "host" is already an IP. Users might have
103    # handled name resolution in their own code and pass in resolved IPs.
104    if not hasattr(socket, 'inet_pton'):
105        return
106
107    if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
108            host is None:
109        return None
110
111    if type == socket.SOCK_STREAM:
112        # Linux only:
113        #    getaddrinfo() can raise when socket.type is a bit mask.
114        #    So if socket.type is a bit mask of SOCK_STREAM, and say
115        #    SOCK_NONBLOCK, we simply return None, which will trigger
116        #    a call to getaddrinfo() letting it process this request.
117        proto = socket.IPPROTO_TCP
118    elif type == socket.SOCK_DGRAM:
119        proto = socket.IPPROTO_UDP
120    else:
121        return None
122
123    if port is None:
124        port = 0
125    elif isinstance(port, bytes) and port == b'':
126        port = 0
127    elif isinstance(port, str) and port == '':
128        port = 0
129    else:
130        # If port's a service name like "http", don't skip getaddrinfo.
131        try:
132            port = int(port)
133        except (TypeError, ValueError):
134            return None
135
136    if family == socket.AF_UNSPEC:
137        afs = [socket.AF_INET]
138        if hasattr(socket, 'AF_INET6'):
139            afs.append(socket.AF_INET6)
140    else:
141        afs = [family]
142
143    if isinstance(host, bytes):
144        host = host.decode('idna')
145    if '%' in host:
146        # Linux's inet_pton doesn't accept an IPv6 zone index after host,
147        # like '::1%lo0'.
148        return None
149
150    for af in afs:
151        try:
152            socket.inet_pton(af, host)
153            # The host has already been resolved.
154            return af, type, proto, '', (host, port)
155        except OSError:
156            pass
157
158    # "host" is not an IP address.
159    return None
160
161
162def _ensure_resolved(address, *, family=0, type=socket.SOCK_STREAM, proto=0,
163                     flags=0, loop):
164    host, port = address[:2]
165    info = _ipaddr_info(host, port, family, type, proto)
166    if info is not None:
167        # "host" is already a resolved IP.
168        fut = loop.create_future()
169        fut.set_result([info])
170        return fut
171    else:
172        return loop.getaddrinfo(host, port, family=family, type=type,
173                                proto=proto, flags=flags)
174
175
176def _run_until_complete_cb(fut):
177    exc = fut._exception
178    if (isinstance(exc, BaseException)
179    and not isinstance(exc, Exception)):
180        # Issue #22429: run_forever() already finished, no need to
181        # stop it.
182        return
183    fut._loop.stop()
184
185
186class Server(events.AbstractServer):
187
188    def __init__(self, loop, sockets):
189        self._loop = loop
190        self.sockets = sockets
191        self._active_count = 0
192        self._waiters = []
193
194    def __repr__(self):
195        return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
196
197    def _attach(self):
198        assert self.sockets is not None
199        self._active_count += 1
200
201    def _detach(self):
202        assert self._active_count > 0
203        self._active_count -= 1
204        if self._active_count == 0 and self.sockets is None:
205            self._wakeup()
206
207    def close(self):
208        sockets = self.sockets
209        if sockets is None:
210            return
211        self.sockets = None
212        for sock in sockets:
213            self._loop._stop_serving(sock)
214        if self._active_count == 0:
215            self._wakeup()
216
217    def _wakeup(self):
218        waiters = self._waiters
219        self._waiters = None
220        for waiter in waiters:
221            if not waiter.done():
222                waiter.set_result(waiter)
223
224    @coroutine
225    def wait_closed(self):
226        if self.sockets is None or self._waiters is None:
227            return
228        waiter = self._loop.create_future()
229        self._waiters.append(waiter)
230        yield from waiter
231
232
233class BaseEventLoop(events.AbstractEventLoop):
234
235    def __init__(self):
236        self._timer_cancelled_count = 0
237        self._closed = False
238        self._stopping = False
239        self._ready = collections.deque()
240        self._scheduled = []
241        self._default_executor = None
242        self._internal_fds = 0
243        # Identifier of the thread running the event loop, or None if the
244        # event loop is not running
245        self._thread_id = None
246        self._clock_resolution = time.get_clock_info('monotonic').resolution
247        self._exception_handler = None
248        self.set_debug((not sys.flags.ignore_environment
249                        and bool(os.environ.get('PYTHONASYNCIODEBUG'))))
250        # In debug mode, if the execution of a callback or a step of a task
251        # exceed this duration in seconds, the slow callback/task is logged.
252        self.slow_callback_duration = 0.1
253        self._current_handle = None
254        self._task_factory = None
255        self._coroutine_wrapper_set = False
256
257        if hasattr(sys, 'get_asyncgen_hooks'):
258            # Python >= 3.6
259            # A weak set of all asynchronous generators that are
260            # being iterated by the loop.
261            self._asyncgens = weakref.WeakSet()
262        else:
263            self._asyncgens = None
264
265        # Set to True when `loop.shutdown_asyncgens` is called.
266        self._asyncgens_shutdown_called = False
267
268    def __repr__(self):
269        return ('<%s running=%s closed=%s debug=%s>'
270                % (self.__class__.__name__, self.is_running(),
271                   self.is_closed(), self.get_debug()))
272
273    def create_future(self):
274        """Create a Future object attached to the loop."""
275        return futures.Future(loop=self)
276
277    def create_task(self, coro):
278        """Schedule a coroutine object.
279
280        Return a task object.
281        """
282        self._check_closed()
283        if self._task_factory is None:
284            task = tasks.Task(coro, loop=self)
285            if task._source_traceback:
286                del task._source_traceback[-1]
287        else:
288            task = self._task_factory(self, coro)
289        return task
290
291    def set_task_factory(self, factory):
292        """Set a task factory that will be used by loop.create_task().
293
294        If factory is None the default task factory will be set.
295
296        If factory is a callable, it should have a signature matching
297        '(loop, coro)', where 'loop' will be a reference to the active
298        event loop, 'coro' will be a coroutine object.  The callable
299        must return a Future.
300        """
301        if factory is not None and not callable(factory):
302            raise TypeError('task factory must be a callable or None')
303        self._task_factory = factory
304
305    def get_task_factory(self):
306        """Return a task factory, or None if the default one is in use."""
307        return self._task_factory
308
309    def _make_socket_transport(self, sock, protocol, waiter=None, *,
310                               extra=None, server=None):
311        """Create socket transport."""
312        raise NotImplementedError
313
314    def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
315                            *, server_side=False, server_hostname=None,
316                            extra=None, server=None):
317        """Create SSL transport."""
318        raise NotImplementedError
319
320    def _make_datagram_transport(self, sock, protocol,
321                                 address=None, waiter=None, extra=None):
322        """Create datagram transport."""
323        raise NotImplementedError
324
325    def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
326                                  extra=None):
327        """Create read pipe transport."""
328        raise NotImplementedError
329
330    def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
331                                   extra=None):
332        """Create write pipe transport."""
333        raise NotImplementedError
334
335    @coroutine
336    def _make_subprocess_transport(self, protocol, args, shell,
337                                   stdin, stdout, stderr, bufsize,
338                                   extra=None, **kwargs):
339        """Create subprocess transport."""
340        raise NotImplementedError
341
342    def _write_to_self(self):
343        """Write a byte to self-pipe, to wake up the event loop.
344
345        This may be called from a different thread.
346
347        The subclass is responsible for implementing the self-pipe.
348        """
349        raise NotImplementedError
350
351    def _process_events(self, event_list):
352        """Process selector events."""
353        raise NotImplementedError
354
355    def _check_closed(self):
356        if self._closed:
357            raise RuntimeError('Event loop is closed')
358
359    def _asyncgen_finalizer_hook(self, agen):
360        self._asyncgens.discard(agen)
361        if not self.is_closed():
362            self.create_task(agen.aclose())
363            # Wake up the loop if the finalizer was called from
364            # a different thread.
365            self._write_to_self()
366
367    def _asyncgen_firstiter_hook(self, agen):
368        if self._asyncgens_shutdown_called:
369            warnings.warn(
370                "asynchronous generator {!r} was scheduled after "
371                "loop.shutdown_asyncgens() call".format(agen),
372                ResourceWarning, source=self)
373
374        self._asyncgens.add(agen)
375
376    @coroutine
377    def shutdown_asyncgens(self):
378        """Shutdown all active asynchronous generators."""
379        self._asyncgens_shutdown_called = True
380
381        if self._asyncgens is None or not len(self._asyncgens):
382            # If Python version is <3.6 or we don't have any asynchronous
383            # generators alive.
384            return
385
386        closing_agens = list(self._asyncgens)
387        self._asyncgens.clear()
388
389        shutdown_coro = tasks.gather(
390            *[ag.aclose() for ag in closing_agens],
391            return_exceptions=True,
392            loop=self)
393
394        results = yield from shutdown_coro
395        for result, agen in zip(results, closing_agens):
396            if isinstance(result, Exception):
397                self.call_exception_handler({
398                    'message': 'an error occurred during closing of '
399                               'asynchronous generator {!r}'.format(agen),
400                    'exception': result,
401                    'asyncgen': agen
402                })
403
404    def run_forever(self):
405        """Run until stop() is called."""
406        self._check_closed()
407        if self.is_running():
408            raise RuntimeError('This event loop is already running')
409        if events._get_running_loop() is not None:
410            raise RuntimeError(
411                'Cannot run the event loop while another loop is running')
412        self._set_coroutine_wrapper(self._debug)
413        self._thread_id = threading.get_ident()
414        if self._asyncgens is not None:
415            old_agen_hooks = sys.get_asyncgen_hooks()
416            sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
417                                   finalizer=self._asyncgen_finalizer_hook)
418        try:
419            events._set_running_loop(self)
420            while True:
421                self._run_once()
422                if self._stopping:
423                    break
424        finally:
425            self._stopping = False
426            self._thread_id = None
427            events._set_running_loop(None)
428            self._set_coroutine_wrapper(False)
429            if self._asyncgens is not None:
430                sys.set_asyncgen_hooks(*old_agen_hooks)
431
432    def run_until_complete(self, future):
433        """Run until the Future is done.
434
435        If the argument is a coroutine, it is wrapped in a Task.
436
437        WARNING: It would be disastrous to call run_until_complete()
438        with the same coroutine twice -- it would wrap it in two
439        different Tasks and that can't be good.
440
441        Return the Future's result, or raise its exception.
442        """
443        self._check_closed()
444
445        new_task = not futures.isfuture(future)
446        future = tasks.ensure_future(future, loop=self)
447        if new_task:
448            # An exception is raised if the future didn't complete, so there
449            # is no need to log the "destroy pending task" message
450            future._log_destroy_pending = False
451
452        future.add_done_callback(_run_until_complete_cb)
453        try:
454            self.run_forever()
455        except:
456            if new_task and future.done() and not future.cancelled():
457                # The coroutine raised a BaseException. Consume the exception
458                # to not log a warning, the caller doesn't have access to the
459                # local task.
460                future.exception()
461            raise
462        future.remove_done_callback(_run_until_complete_cb)
463        if not future.done():
464            raise RuntimeError('Event loop stopped before Future completed.')
465
466        return future.result()
467
468    def stop(self):
469        """Stop running the event loop.
470
471        Every callback already scheduled will still run.  This simply informs
472        run_forever to stop looping after a complete iteration.
473        """
474        self._stopping = True
475
476    def close(self):
477        """Close the event loop.
478
479        This clears the queues and shuts down the executor,
480        but does not wait for the executor to finish.
481
482        The event loop must not be running.
483        """
484        if self.is_running():
485            raise RuntimeError("Cannot close a running event loop")
486        if self._closed:
487            return
488        if self._debug:
489            logger.debug("Close %r", self)
490        self._closed = True
491        self._ready.clear()
492        self._scheduled.clear()
493        executor = self._default_executor
494        if executor is not None:
495            self._default_executor = None
496            executor.shutdown(wait=False)
497
498    def is_closed(self):
499        """Returns True if the event loop was closed."""
500        return self._closed
501
502    # On Python 3.3 and older, objects with a destructor part of a reference
503    # cycle are never destroyed. It's not more the case on Python 3.4 thanks
504    # to the PEP 442.
505    if compat.PY34:
506        def __del__(self):
507            if not self.is_closed():
508                warnings.warn("unclosed event loop %r" % self, ResourceWarning,
509                              source=self)
510                if not self.is_running():
511                    self.close()
512
513    def is_running(self):
514        """Returns True if the event loop is running."""
515        return (self._thread_id is not None)
516
517    def time(self):
518        """Return the time according to the event loop's clock.
519
520        This is a float expressed in seconds since an epoch, but the
521        epoch, precision, accuracy and drift are unspecified and may
522        differ per event loop.
523        """
524        return time.monotonic()
525
526    def call_later(self, delay, callback, *args):
527        """Arrange for a callback to be called at a given time.
528
529        Return a Handle: an opaque object with a cancel() method that
530        can be used to cancel the call.
531
532        The delay can be an int or float, expressed in seconds.  It is
533        always relative to the current time.
534
535        Each callback will be called exactly once.  If two callbacks
536        are scheduled for exactly the same time, it undefined which
537        will be called first.
538
539        Any positional arguments after the callback will be passed to
540        the callback when it is called.
541        """
542        timer = self.call_at(self.time() + delay, callback, *args)
543        if timer._source_traceback:
544            del timer._source_traceback[-1]
545        return timer
546
547    def call_at(self, when, callback, *args):
548        """Like call_later(), but uses an absolute time.
549
550        Absolute time corresponds to the event loop's time() method.
551        """
552        self._check_closed()
553        if self._debug:
554            self._check_thread()
555            self._check_callback(callback, 'call_at')
556        timer = events.TimerHandle(when, callback, args, self)
557        if timer._source_traceback:
558            del timer._source_traceback[-1]
559        heapq.heappush(self._scheduled, timer)
560        timer._scheduled = True
561        return timer
562
563    def call_soon(self, callback, *args):
564        """Arrange for a callback to be called as soon as possible.
565
566        This operates as a FIFO queue: callbacks are called in the
567        order in which they are registered.  Each callback will be
568        called exactly once.
569
570        Any positional arguments after the callback will be passed to
571        the callback when it is called.
572        """
573        self._check_closed()
574        if self._debug:
575            self._check_thread()
576            self._check_callback(callback, 'call_soon')
577        handle = self._call_soon(callback, args)
578        if handle._source_traceback:
579            del handle._source_traceback[-1]
580        return handle
581
582    def _check_callback(self, callback, method):
583        if (coroutines.iscoroutine(callback) or
584                coroutines.iscoroutinefunction(callback)):
585            raise TypeError(
586                "coroutines cannot be used with {}()".format(method))
587        if not callable(callback):
588            raise TypeError(
589                'a callable object was expected by {}(), got {!r}'.format(
590                    method, callback))
591
592
593    def _call_soon(self, callback, args):
594        handle = events.Handle(callback, args, self)
595        if handle._source_traceback:
596            del handle._source_traceback[-1]
597        self._ready.append(handle)
598        return handle
599
600    def _check_thread(self):
601        """Check that the current thread is the thread running the event loop.
602
603        Non-thread-safe methods of this class make this assumption and will
604        likely behave incorrectly when the assumption is violated.
605
606        Should only be called when (self._debug == True).  The caller is
607        responsible for checking this condition for performance reasons.
608        """
609        if self._thread_id is None:
610            return
611        thread_id = threading.get_ident()
612        if thread_id != self._thread_id:
613            raise RuntimeError(
614                "Non-thread-safe operation invoked on an event loop other "
615                "than the current one")
616
617    def call_soon_threadsafe(self, callback, *args):
618        """Like call_soon(), but thread-safe."""
619        self._check_closed()
620        if self._debug:
621            self._check_callback(callback, 'call_soon_threadsafe')
622        handle = self._call_soon(callback, args)
623        if handle._source_traceback:
624            del handle._source_traceback[-1]
625        self._write_to_self()
626        return handle
627
628    def run_in_executor(self, executor, func, *args):
629        self._check_closed()
630        if self._debug:
631            self._check_callback(func, 'run_in_executor')
632        if executor is None:
633            executor = self._default_executor
634            if executor is None:
635                executor = concurrent.futures.ThreadPoolExecutor()
636                self._default_executor = executor
637        return futures.wrap_future(executor.submit(func, *args), loop=self)
638
639    def set_default_executor(self, executor):
640        self._default_executor = executor
641
642    def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
643        msg = ["%s:%r" % (host, port)]
644        if family:
645            msg.append('family=%r' % family)
646        if type:
647            msg.append('type=%r' % type)
648        if proto:
649            msg.append('proto=%r' % proto)
650        if flags:
651            msg.append('flags=%r' % flags)
652        msg = ', '.join(msg)
653        logger.debug('Get address info %s', msg)
654
655        t0 = self.time()
656        addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
657        dt = self.time() - t0
658
659        msg = ('Getting address info %s took %.3f ms: %r'
660               % (msg, dt * 1e3, addrinfo))
661        if dt >= self.slow_callback_duration:
662            logger.info(msg)
663        else:
664            logger.debug(msg)
665        return addrinfo
666
667    def getaddrinfo(self, host, port, *,
668                    family=0, type=0, proto=0, flags=0):
669        if self._debug:
670            return self.run_in_executor(None, self._getaddrinfo_debug,
671                                        host, port, family, type, proto, flags)
672        else:
673            return self.run_in_executor(None, socket.getaddrinfo,
674                                        host, port, family, type, proto, flags)
675
676    def getnameinfo(self, sockaddr, flags=0):
677        return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
678
679    @coroutine
680    def create_connection(self, protocol_factory, host=None, port=None, *,
681                          ssl=None, family=0, proto=0, flags=0, sock=None,
682                          local_addr=None, server_hostname=None):
683        """Connect to a TCP server.
684
685        Create a streaming transport connection to a given Internet host and
686        port: socket family AF_INET or socket.AF_INET6 depending on host (or
687        family if specified), socket type SOCK_STREAM. protocol_factory must be
688        a callable returning a protocol instance.
689
690        This method is a coroutine which will try to establish the connection
691        in the background.  When successful, the coroutine returns a
692        (transport, protocol) pair.
693        """
694        if server_hostname is not None and not ssl:
695            raise ValueError('server_hostname is only meaningful with ssl')
696
697        if server_hostname is None and ssl:
698            # Use host as default for server_hostname.  It is an error
699            # if host is empty or not set, e.g. when an
700            # already-connected socket was passed or when only a port
701            # is given.  To avoid this error, you can pass
702            # server_hostname='' -- this will bypass the hostname
703            # check.  (This also means that if host is a numeric
704            # IP/IPv6 address, we will attempt to verify that exact
705            # address; this will probably fail, but it is possible to
706            # create a certificate for a specific IP address, so we
707            # don't judge it here.)
708            if not host:
709                raise ValueError('You must set server_hostname '
710                                 'when using ssl without a host')
711            server_hostname = host
712
713        if host is not None or port is not None:
714            if sock is not None:
715                raise ValueError(
716                    'host/port and sock can not be specified at the same time')
717
718            f1 = _ensure_resolved((host, port), family=family,
719                                  type=socket.SOCK_STREAM, proto=proto,
720                                  flags=flags, loop=self)
721            fs = [f1]
722            if local_addr is not None:
723                f2 = _ensure_resolved(local_addr, family=family,
724                                      type=socket.SOCK_STREAM, proto=proto,
725                                      flags=flags, loop=self)
726                fs.append(f2)
727            else:
728                f2 = None
729
730            yield from tasks.wait(fs, loop=self)
731
732            infos = f1.result()
733            if not infos:
734                raise OSError('getaddrinfo() returned empty list')
735            if f2 is not None:
736                laddr_infos = f2.result()
737                if not laddr_infos:
738                    raise OSError('getaddrinfo() returned empty list')
739
740            exceptions = []
741            for family, type, proto, cname, address in infos:
742                try:
743                    sock = socket.socket(family=family, type=type, proto=proto)
744                    sock.setblocking(False)
745                    if f2 is not None:
746                        for _, _, _, _, laddr in laddr_infos:
747                            try:
748                                sock.bind(laddr)
749                                break
750                            except OSError as exc:
751                                exc = OSError(
752                                    exc.errno, 'error while '
753                                    'attempting to bind on address '
754                                    '{!r}: {}'.format(
755                                        laddr, exc.strerror.lower()))
756                                exceptions.append(exc)
757                        else:
758                            sock.close()
759                            sock = None
760                            continue
761                    if self._debug:
762                        logger.debug("connect %r to %r", sock, address)
763                    yield from self.sock_connect(sock, address)
764                except OSError as exc:
765                    if sock is not None:
766                        sock.close()
767                    exceptions.append(exc)
768                except:
769                    if sock is not None:
770                        sock.close()
771                    raise
772                else:
773                    break
774            else:
775                if len(exceptions) == 1:
776                    raise exceptions[0]
777                else:
778                    # If they all have the same str(), raise one.
779                    model = str(exceptions[0])
780                    if all(str(exc) == model for exc in exceptions):
781                        raise exceptions[0]
782                    # Raise a combined exception so the user can see all
783                    # the various error messages.
784                    raise OSError('Multiple exceptions: {}'.format(
785                        ', '.join(str(exc) for exc in exceptions)))
786
787        else:
788            if sock is None:
789                raise ValueError(
790                    'host and port was not specified and no sock specified')
791            if not _is_stream_socket(sock):
792                # We allow AF_INET, AF_INET6, AF_UNIX as long as they
793                # are SOCK_STREAM.
794                # We support passing AF_UNIX sockets even though we have
795                # a dedicated API for that: create_unix_connection.
796                # Disallowing AF_UNIX in this method, breaks backwards
797                # compatibility.
798                raise ValueError(
799                    'A Stream Socket was expected, got {!r}'.format(sock))
800
801        transport, protocol = yield from self._create_connection_transport(
802            sock, protocol_factory, ssl, server_hostname)
803        if self._debug:
804            # Get the socket from the transport because SSL transport closes
805            # the old socket and creates a new SSL socket
806            sock = transport.get_extra_info('socket')
807            logger.debug("%r connected to %s:%r: (%r, %r)",
808                         sock, host, port, transport, protocol)
809        return transport, protocol
810
811    @coroutine
812    def _create_connection_transport(self, sock, protocol_factory, ssl,
813                                     server_hostname, server_side=False):
814
815        sock.setblocking(False)
816
817        protocol = protocol_factory()
818        waiter = self.create_future()
819        if ssl:
820            sslcontext = None if isinstance(ssl, bool) else ssl
821            transport = self._make_ssl_transport(
822                sock, protocol, sslcontext, waiter,
823                server_side=server_side, server_hostname=server_hostname)
824        else:
825            transport = self._make_socket_transport(sock, protocol, waiter)
826
827        try:
828            yield from waiter
829        except:
830            transport.close()
831            raise
832
833        return transport, protocol
834
835    @coroutine
836    def create_datagram_endpoint(self, protocol_factory,
837                                 local_addr=None, remote_addr=None, *,
838                                 family=0, proto=0, flags=0,
839                                 reuse_address=None, reuse_port=None,
840                                 allow_broadcast=None, sock=None):
841        """Create datagram connection."""
842        if sock is not None:
843            if not _is_dgram_socket(sock):
844                raise ValueError(
845                    'A UDP Socket was expected, got {!r}'.format(sock))
846            if (local_addr or remote_addr or
847                    family or proto or flags or
848                    reuse_address or reuse_port or allow_broadcast):
849                # show the problematic kwargs in exception msg
850                opts = dict(local_addr=local_addr, remote_addr=remote_addr,
851                            family=family, proto=proto, flags=flags,
852                            reuse_address=reuse_address, reuse_port=reuse_port,
853                            allow_broadcast=allow_broadcast)
854                problems = ', '.join(
855                    '{}={}'.format(k, v) for k, v in opts.items() if v)
856                raise ValueError(
857                    'socket modifier keyword arguments can not be used '
858                    'when sock is specified. ({})'.format(problems))
859            sock.setblocking(False)
860            r_addr = None
861        else:
862            if not (local_addr or remote_addr):
863                if family == 0:
864                    raise ValueError('unexpected address family')
865                addr_pairs_info = (((family, proto), (None, None)),)
866            else:
867                # join address by (family, protocol)
868                addr_infos = collections.OrderedDict()
869                for idx, addr in ((0, local_addr), (1, remote_addr)):
870                    if addr is not None:
871                        assert isinstance(addr, tuple) and len(addr) == 2, (
872                            '2-tuple is expected')
873
874                        infos = yield from _ensure_resolved(
875                            addr, family=family, type=socket.SOCK_DGRAM,
876                            proto=proto, flags=flags, loop=self)
877                        if not infos:
878                            raise OSError('getaddrinfo() returned empty list')
879
880                        for fam, _, pro, _, address in infos:
881                            key = (fam, pro)
882                            if key not in addr_infos:
883                                addr_infos[key] = [None, None]
884                            addr_infos[key][idx] = address
885
886                # each addr has to have info for each (family, proto) pair
887                addr_pairs_info = [
888                    (key, addr_pair) for key, addr_pair in addr_infos.items()
889                    if not ((local_addr and addr_pair[0] is None) or
890                            (remote_addr and addr_pair[1] is None))]
891
892                if not addr_pairs_info:
893                    raise ValueError('can not get address information')
894
895            exceptions = []
896
897            if reuse_address is None:
898                reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
899
900            for ((family, proto),
901                 (local_address, remote_address)) in addr_pairs_info:
902                sock = None
903                r_addr = None
904                try:
905                    sock = socket.socket(
906                        family=family, type=socket.SOCK_DGRAM, proto=proto)
907                    if reuse_address:
908                        sock.setsockopt(
909                            socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
910                    if reuse_port:
911                        _set_reuseport(sock)
912                    if allow_broadcast:
913                        sock.setsockopt(
914                            socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
915                    sock.setblocking(False)
916
917                    if local_addr:
918                        sock.bind(local_address)
919                    if remote_addr:
920                        yield from self.sock_connect(sock, remote_address)
921                        r_addr = remote_address
922                except OSError as exc:
923                    if sock is not None:
924                        sock.close()
925                    exceptions.append(exc)
926                except:
927                    if sock is not None:
928                        sock.close()
929                    raise
930                else:
931                    break
932            else:
933                raise exceptions[0]
934
935        protocol = protocol_factory()
936        waiter = self.create_future()
937        transport = self._make_datagram_transport(
938            sock, protocol, r_addr, waiter)
939        if self._debug:
940            if local_addr:
941                logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
942                            "created: (%r, %r)",
943                            local_addr, remote_addr, transport, protocol)
944            else:
945                logger.debug("Datagram endpoint remote_addr=%r created: "
946                             "(%r, %r)",
947                             remote_addr, transport, protocol)
948
949        try:
950            yield from waiter
951        except:
952            transport.close()
953            raise
954
955        return transport, protocol
956
957    @coroutine
958    def _create_server_getaddrinfo(self, host, port, family, flags):
959        infos = yield from _ensure_resolved((host, port), family=family,
960                                            type=socket.SOCK_STREAM,
961                                            flags=flags, loop=self)
962        if not infos:
963            raise OSError('getaddrinfo({!r}) returned empty list'.format(host))
964        return infos
965
966    @coroutine
967    def create_server(self, protocol_factory, host=None, port=None,
968                      *,
969                      family=socket.AF_UNSPEC,
970                      flags=socket.AI_PASSIVE,
971                      sock=None,
972                      backlog=100,
973                      ssl=None,
974                      reuse_address=None,
975                      reuse_port=None):
976        """Create a TCP server.
977
978        The host parameter can be a string, in that case the TCP server is bound
979        to host and port.
980
981        The host parameter can also be a sequence of strings and in that case
982        the TCP server is bound to all hosts of the sequence. If a host
983        appears multiple times (possibly indirectly e.g. when hostnames
984        resolve to the same IP address), the server is only bound once to that
985        host.
986
987        Return a Server object which can be used to stop the service.
988
989        This method is a coroutine.
990        """
991        if isinstance(ssl, bool):
992            raise TypeError('ssl argument must be an SSLContext or None')
993        if host is not None or port is not None:
994            if sock is not None:
995                raise ValueError(
996                    'host/port and sock can not be specified at the same time')
997
998            AF_INET6 = getattr(socket, 'AF_INET6', 0)
999            if reuse_address is None:
1000                reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
1001            sockets = []
1002            if host == '':
1003                hosts = [None]
1004            elif (isinstance(host, str) or
1005                  not isinstance(host, collections.Iterable)):
1006                hosts = [host]
1007            else:
1008                hosts = host
1009
1010            fs = [self._create_server_getaddrinfo(host, port, family=family,
1011                                                  flags=flags)
1012                  for host in hosts]
1013            infos = yield from tasks.gather(*fs, loop=self)
1014            infos = set(itertools.chain.from_iterable(infos))
1015
1016            completed = False
1017            try:
1018                for res in infos:
1019                    af, socktype, proto, canonname, sa = res
1020                    try:
1021                        sock = socket.socket(af, socktype, proto)
1022                    except socket.error:
1023                        # Assume it's a bad family/type/protocol combination.
1024                        if self._debug:
1025                            logger.warning('create_server() failed to create '
1026                                           'socket.socket(%r, %r, %r)',
1027                                           af, socktype, proto, exc_info=True)
1028                        continue
1029                    sockets.append(sock)
1030                    if reuse_address:
1031                        sock.setsockopt(
1032                            socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1033                    if reuse_port:
1034                        _set_reuseport(sock)
1035                    # Disable IPv4/IPv6 dual stack support (enabled by
1036                    # default on Linux) which makes a single socket
1037                    # listen on both address families.
1038                    if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
1039                        sock.setsockopt(socket.IPPROTO_IPV6,
1040                                        socket.IPV6_V6ONLY,
1041                                        True)
1042                    try:
1043                        sock.bind(sa)
1044                    except OSError as err:
1045                        raise OSError(err.errno, 'error while attempting '
1046                                      'to bind on address %r: %s'
1047                                      % (sa, err.strerror.lower()))
1048                completed = True
1049            finally:
1050                if not completed:
1051                    for sock in sockets:
1052                        sock.close()
1053        else:
1054            if sock is None:
1055                raise ValueError('Neither host/port nor sock were specified')
1056            if not _is_stream_socket(sock):
1057                raise ValueError(
1058                    'A Stream Socket was expected, got {!r}'.format(sock))
1059            sockets = [sock]
1060
1061        server = Server(self, sockets)
1062        for sock in sockets:
1063            sock.listen(backlog)
1064            sock.setblocking(False)
1065            self._start_serving(protocol_factory, sock, ssl, server, backlog)
1066        if self._debug:
1067            logger.info("%r is serving", server)
1068        return server
1069
1070    @coroutine
1071    def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None):
1072        """Handle an accepted connection.
1073
1074        This is used by servers that accept connections outside of
1075        asyncio but that use asyncio to handle connections.
1076
1077        This method is a coroutine.  When completed, the coroutine
1078        returns a (transport, protocol) pair.
1079        """
1080        if not _is_stream_socket(sock):
1081            raise ValueError(
1082                'A Stream Socket was expected, got {!r}'.format(sock))
1083
1084        transport, protocol = yield from self._create_connection_transport(
1085            sock, protocol_factory, ssl, '', server_side=True)
1086        if self._debug:
1087            # Get the socket from the transport because SSL transport closes
1088            # the old socket and creates a new SSL socket
1089            sock = transport.get_extra_info('socket')
1090            logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1091        return transport, protocol
1092
1093    @coroutine
1094    def connect_read_pipe(self, protocol_factory, pipe):
1095        protocol = protocol_factory()
1096        waiter = self.create_future()
1097        transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1098
1099        try:
1100            yield from waiter
1101        except:
1102            transport.close()
1103            raise
1104
1105        if self._debug:
1106            logger.debug('Read pipe %r connected: (%r, %r)',
1107                         pipe.fileno(), transport, protocol)
1108        return transport, protocol
1109
1110    @coroutine
1111    def connect_write_pipe(self, protocol_factory, pipe):
1112        protocol = protocol_factory()
1113        waiter = self.create_future()
1114        transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1115
1116        try:
1117            yield from waiter
1118        except:
1119            transport.close()
1120            raise
1121
1122        if self._debug:
1123            logger.debug('Write pipe %r connected: (%r, %r)',
1124                         pipe.fileno(), transport, protocol)
1125        return transport, protocol
1126
1127    def _log_subprocess(self, msg, stdin, stdout, stderr):
1128        info = [msg]
1129        if stdin is not None:
1130            info.append('stdin=%s' % _format_pipe(stdin))
1131        if stdout is not None and stderr == subprocess.STDOUT:
1132            info.append('stdout=stderr=%s' % _format_pipe(stdout))
1133        else:
1134            if stdout is not None:
1135                info.append('stdout=%s' % _format_pipe(stdout))
1136            if stderr is not None:
1137                info.append('stderr=%s' % _format_pipe(stderr))
1138        logger.debug(' '.join(info))
1139
1140    @coroutine
1141    def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
1142                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
1143                         universal_newlines=False, shell=True, bufsize=0,
1144                         **kwargs):
1145        if not isinstance(cmd, (bytes, str)):
1146            raise ValueError("cmd must be a string")
1147        if universal_newlines:
1148            raise ValueError("universal_newlines must be False")
1149        if not shell:
1150            raise ValueError("shell must be True")
1151        if bufsize != 0:
1152            raise ValueError("bufsize must be 0")
1153        protocol = protocol_factory()
1154        if self._debug:
1155            # don't log parameters: they may contain sensitive information
1156            # (password) and may be too long
1157            debug_log = 'run shell command %r' % cmd
1158            self._log_subprocess(debug_log, stdin, stdout, stderr)
1159        transport = yield from self._make_subprocess_transport(
1160            protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1161        if self._debug:
1162            logger.info('%s: %r', debug_log, transport)
1163        return transport, protocol
1164
1165    @coroutine
1166    def subprocess_exec(self, protocol_factory, program, *args,
1167                        stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1168                        stderr=subprocess.PIPE, universal_newlines=False,
1169                        shell=False, bufsize=0, **kwargs):
1170        if universal_newlines:
1171            raise ValueError("universal_newlines must be False")
1172        if shell:
1173            raise ValueError("shell must be False")
1174        if bufsize != 0:
1175            raise ValueError("bufsize must be 0")
1176        popen_args = (program,) + args
1177        for arg in popen_args:
1178            if not isinstance(arg, (str, bytes)):
1179                raise TypeError("program arguments must be "
1180                                "a bytes or text string, not %s"
1181                                % type(arg).__name__)
1182        protocol = protocol_factory()
1183        if self._debug:
1184            # don't log parameters: they may contain sensitive information
1185            # (password) and may be too long
1186            debug_log = 'execute program %r' % program
1187            self._log_subprocess(debug_log, stdin, stdout, stderr)
1188        transport = yield from self._make_subprocess_transport(
1189            protocol, popen_args, False, stdin, stdout, stderr,
1190            bufsize, **kwargs)
1191        if self._debug:
1192            logger.info('%s: %r', debug_log, transport)
1193        return transport, protocol
1194
1195    def get_exception_handler(self):
1196        """Return an exception handler, or None if the default one is in use.
1197        """
1198        return self._exception_handler
1199
1200    def set_exception_handler(self, handler):
1201        """Set handler as the new event loop exception handler.
1202
1203        If handler is None, the default exception handler will
1204        be set.
1205
1206        If handler is a callable object, it should have a
1207        signature matching '(loop, context)', where 'loop'
1208        will be a reference to the active event loop, 'context'
1209        will be a dict object (see `call_exception_handler()`
1210        documentation for details about context).
1211        """
1212        if handler is not None and not callable(handler):
1213            raise TypeError('A callable object or None is expected, '
1214                            'got {!r}'.format(handler))
1215        self._exception_handler = handler
1216
1217    def default_exception_handler(self, context):
1218        """Default exception handler.
1219
1220        This is called when an exception occurs and no exception
1221        handler is set, and can be called by a custom exception
1222        handler that wants to defer to the default behavior.
1223
1224        The context parameter has the same meaning as in
1225        `call_exception_handler()`.
1226        """
1227        message = context.get('message')
1228        if not message:
1229            message = 'Unhandled exception in event loop'
1230
1231        exception = context.get('exception')
1232        if exception is not None:
1233            exc_info = (type(exception), exception, exception.__traceback__)
1234        else:
1235            exc_info = False
1236
1237        if ('source_traceback' not in context
1238        and self._current_handle is not None
1239        and self._current_handle._source_traceback):
1240            context['handle_traceback'] = self._current_handle._source_traceback
1241
1242        log_lines = [message]
1243        for key in sorted(context):
1244            if key in {'message', 'exception'}:
1245                continue
1246            value = context[key]
1247            if key == 'source_traceback':
1248                tb = ''.join(traceback.format_list(value))
1249                value = 'Object created at (most recent call last):\n'
1250                value += tb.rstrip()
1251            elif key == 'handle_traceback':
1252                tb = ''.join(traceback.format_list(value))
1253                value = 'Handle created at (most recent call last):\n'
1254                value += tb.rstrip()
1255            else:
1256                value = repr(value)
1257            log_lines.append('{}: {}'.format(key, value))
1258
1259        logger.error('\n'.join(log_lines), exc_info=exc_info)
1260
1261    def call_exception_handler(self, context):
1262        """Call the current event loop's exception handler.
1263
1264        The context argument is a dict containing the following keys:
1265
1266        - 'message': Error message;
1267        - 'exception' (optional): Exception object;
1268        - 'future' (optional): Future instance;
1269        - 'handle' (optional): Handle instance;
1270        - 'protocol' (optional): Protocol instance;
1271        - 'transport' (optional): Transport instance;
1272        - 'socket' (optional): Socket instance;
1273        - 'asyncgen' (optional): Asynchronous generator that caused
1274                                 the exception.
1275
1276        New keys maybe introduced in the future.
1277
1278        Note: do not overload this method in an event loop subclass.
1279        For custom exception handling, use the
1280        `set_exception_handler()` method.
1281        """
1282        if self._exception_handler is None:
1283            try:
1284                self.default_exception_handler(context)
1285            except Exception:
1286                # Second protection layer for unexpected errors
1287                # in the default implementation, as well as for subclassed
1288                # event loops with overloaded "default_exception_handler".
1289                logger.error('Exception in default exception handler',
1290                             exc_info=True)
1291        else:
1292            try:
1293                self._exception_handler(self, context)
1294            except Exception as exc:
1295                # Exception in the user set custom exception handler.
1296                try:
1297                    # Let's try default handler.
1298                    self.default_exception_handler({
1299                        'message': 'Unhandled error in exception handler',
1300                        'exception': exc,
1301                        'context': context,
1302                    })
1303                except Exception:
1304                    # Guard 'default_exception_handler' in case it is
1305                    # overloaded.
1306                    logger.error('Exception in default exception handler '
1307                                 'while handling an unexpected error '
1308                                 'in custom exception handler',
1309                                 exc_info=True)
1310
1311    def _add_callback(self, handle):
1312        """Add a Handle to _scheduled (TimerHandle) or _ready."""
1313        assert isinstance(handle, events.Handle), 'A Handle is required here'
1314        if handle._cancelled:
1315            return
1316        assert not isinstance(handle, events.TimerHandle)
1317        self._ready.append(handle)
1318
1319    def _add_callback_signalsafe(self, handle):
1320        """Like _add_callback() but called from a signal handler."""
1321        self._add_callback(handle)
1322        self._write_to_self()
1323
1324    def _timer_handle_cancelled(self, handle):
1325        """Notification that a TimerHandle has been cancelled."""
1326        if handle._scheduled:
1327            self._timer_cancelled_count += 1
1328
1329    def _run_once(self):
1330        """Run one full iteration of the event loop.
1331
1332        This calls all currently ready callbacks, polls for I/O,
1333        schedules the resulting callbacks, and finally schedules
1334        'call_later' callbacks.
1335        """
1336
1337        sched_count = len(self._scheduled)
1338        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1339            self._timer_cancelled_count / sched_count >
1340                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1341            # Remove delayed calls that were cancelled if their number
1342            # is too high
1343            new_scheduled = []
1344            for handle in self._scheduled:
1345                if handle._cancelled:
1346                    handle._scheduled = False
1347                else:
1348                    new_scheduled.append(handle)
1349
1350            heapq.heapify(new_scheduled)
1351            self._scheduled = new_scheduled
1352            self._timer_cancelled_count = 0
1353        else:
1354            # Remove delayed calls that were cancelled from head of queue.
1355            while self._scheduled and self._scheduled[0]._cancelled:
1356                self._timer_cancelled_count -= 1
1357                handle = heapq.heappop(self._scheduled)
1358                handle._scheduled = False
1359
1360        timeout = None
1361        if self._ready or self._stopping:
1362            timeout = 0
1363        elif self._scheduled:
1364            # Compute the desired timeout.
1365            when = self._scheduled[0]._when
1366            timeout = max(0, when - self.time())
1367
1368        if self._debug and timeout != 0:
1369            t0 = self.time()
1370            event_list = self._selector.select(timeout)
1371            dt = self.time() - t0
1372            if dt >= 1.0:
1373                level = logging.INFO
1374            else:
1375                level = logging.DEBUG
1376            nevent = len(event_list)
1377            if timeout is None:
1378                logger.log(level, 'poll took %.3f ms: %s events',
1379                           dt * 1e3, nevent)
1380            elif nevent:
1381                logger.log(level,
1382                           'poll %.3f ms took %.3f ms: %s events',
1383                           timeout * 1e3, dt * 1e3, nevent)
1384            elif dt >= 1.0:
1385                logger.log(level,
1386                           'poll %.3f ms took %.3f ms: timeout',
1387                           timeout * 1e3, dt * 1e3)
1388        else:
1389            event_list = self._selector.select(timeout)
1390        self._process_events(event_list)
1391
1392        # Handle 'later' callbacks that are ready.
1393        end_time = self.time() + self._clock_resolution
1394        while self._scheduled:
1395            handle = self._scheduled[0]
1396            if handle._when >= end_time:
1397                break
1398            handle = heapq.heappop(self._scheduled)
1399            handle._scheduled = False
1400            self._ready.append(handle)
1401
1402        # This is the only place where callbacks are actually *called*.
1403        # All other places just add them to ready.
1404        # Note: We run all currently scheduled callbacks, but not any
1405        # callbacks scheduled by callbacks run this time around --
1406        # they will be run the next time (after another I/O poll).
1407        # Use an idiom that is thread-safe without using locks.
1408        ntodo = len(self._ready)
1409        for i in range(ntodo):
1410            handle = self._ready.popleft()
1411            if handle._cancelled:
1412                continue
1413            if self._debug:
1414                try:
1415                    self._current_handle = handle
1416                    t0 = self.time()
1417                    handle._run()
1418                    dt = self.time() - t0
1419                    if dt >= self.slow_callback_duration:
1420                        logger.warning('Executing %s took %.3f seconds',
1421                                       _format_handle(handle), dt)
1422                finally:
1423                    self._current_handle = None
1424            else:
1425                handle._run()
1426        handle = None  # Needed to break cycles when an exception occurs.
1427
1428    def _set_coroutine_wrapper(self, enabled):
1429        try:
1430            set_wrapper = sys.set_coroutine_wrapper
1431            get_wrapper = sys.get_coroutine_wrapper
1432        except AttributeError:
1433            return
1434
1435        enabled = bool(enabled)
1436        if self._coroutine_wrapper_set == enabled:
1437            return
1438
1439        wrapper = coroutines.debug_wrapper
1440        current_wrapper = get_wrapper()
1441
1442        if enabled:
1443            if current_wrapper not in (None, wrapper):
1444                warnings.warn(
1445                    "loop.set_debug(True): cannot set debug coroutine "
1446                    "wrapper; another wrapper is already set %r" %
1447                    current_wrapper, RuntimeWarning)
1448            else:
1449                set_wrapper(wrapper)
1450                self._coroutine_wrapper_set = True
1451        else:
1452            if current_wrapper not in (None, wrapper):
1453                warnings.warn(
1454                    "loop.set_debug(False): cannot unset debug coroutine "
1455                    "wrapper; another wrapper was set %r" %
1456                    current_wrapper, RuntimeWarning)
1457            else:
1458                set_wrapper(None)
1459                self._coroutine_wrapper_set = False
1460
1461    def get_debug(self):
1462        return self._debug
1463
1464    def set_debug(self, enabled):
1465        self._debug = enabled
1466
1467        if self.is_running():
1468            self._set_coroutine_wrapper(enabled)
1469