• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Selectors module.
2
3This module allows high-level and efficient I/O multiplexing, built upon the
4`select` module primitives.
5"""
6
7
8from abc import ABCMeta, abstractmethod
9from collections import namedtuple
10from collections.abc import Mapping
11import math
12import select
13import sys
14
15
16# generic events, that must be mapped to implementation-specific ones
17EVENT_READ = (1 << 0)
18EVENT_WRITE = (1 << 1)
19
20
21def _fileobj_to_fd(fileobj):
22    """Return a file descriptor from a file object.
23
24    Parameters:
25    fileobj -- file object or file descriptor
26
27    Returns:
28    corresponding file descriptor
29
30    Raises:
31    ValueError if the object is invalid
32    """
33    if isinstance(fileobj, int):
34        fd = fileobj
35    else:
36        try:
37            fd = int(fileobj.fileno())
38        except (AttributeError, TypeError, ValueError):
39            raise ValueError("Invalid file object: "
40                             "{!r}".format(fileobj)) from None
41    if fd < 0:
42        raise ValueError("Invalid file descriptor: {}".format(fd))
43    return fd
44
45
46SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
47
48SelectorKey.__doc__ = """SelectorKey(fileobj, fd, events, data)
49
50    Object used to associate a file object to its backing
51    file descriptor, selected event mask, and attached data.
52"""
53if sys.version_info >= (3, 5):
54    SelectorKey.fileobj.__doc__ = 'File object registered.'
55    SelectorKey.fd.__doc__ = 'Underlying file descriptor.'
56    SelectorKey.events.__doc__ = 'Events that must be waited for on this file object.'
57    SelectorKey.data.__doc__ = ('''Optional opaque data associated to this file object.
58    For example, this could be used to store a per-client session ID.''')
59
60
61class _SelectorMapping(Mapping):
62    """Mapping of file objects to selector keys."""
63
64    def __init__(self, selector):
65        self._selector = selector
66
67    def __len__(self):
68        return len(self._selector._fd_to_key)
69
70    def __getitem__(self, fileobj):
71        try:
72            fd = self._selector._fileobj_lookup(fileobj)
73            return self._selector._fd_to_key[fd]
74        except KeyError:
75            raise KeyError("{!r} is not registered".format(fileobj)) from None
76
77    def __iter__(self):
78        return iter(self._selector._fd_to_key)
79
80
81class BaseSelector(metaclass=ABCMeta):
82    """Selector abstract base class.
83
84    A selector supports registering file objects to be monitored for specific
85    I/O events.
86
87    A file object is a file descriptor or any object with a `fileno()` method.
88    An arbitrary object can be attached to the file object, which can be used
89    for example to store context information, a callback, etc.
90
91    A selector can use various implementations (select(), poll(), epoll()...)
92    depending on the platform. The default `Selector` class uses the most
93    efficient implementation on the current platform.
94    """
95
96    @abstractmethod
97    def register(self, fileobj, events, data=None):
98        """Register a file object.
99
100        Parameters:
101        fileobj -- file object or file descriptor
102        events  -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
103        data    -- attached data
104
105        Returns:
106        SelectorKey instance
107
108        Raises:
109        ValueError if events is invalid
110        KeyError if fileobj is already registered
111        OSError if fileobj is closed or otherwise is unacceptable to
112                the underlying system call (if a system call is made)
113
114        Note:
115        OSError may or may not be raised
116        """
117        raise NotImplementedError
118
119    @abstractmethod
120    def unregister(self, fileobj):
121        """Unregister a file object.
122
123        Parameters:
124        fileobj -- file object or file descriptor
125
126        Returns:
127        SelectorKey instance
128
129        Raises:
130        KeyError if fileobj is not registered
131
132        Note:
133        If fileobj is registered but has since been closed this does
134        *not* raise OSError (even if the wrapped syscall does)
135        """
136        raise NotImplementedError
137
138    def modify(self, fileobj, events, data=None):
139        """Change a registered file object monitored events or attached data.
140
141        Parameters:
142        fileobj -- file object or file descriptor
143        events  -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
144        data    -- attached data
145
146        Returns:
147        SelectorKey instance
148
149        Raises:
150        Anything that unregister() or register() raises
151        """
152        self.unregister(fileobj)
153        return self.register(fileobj, events, data)
154
155    @abstractmethod
156    def select(self, timeout=None):
157        """Perform the actual selection, until some monitored file objects are
158        ready or a timeout expires.
159
160        Parameters:
161        timeout -- if timeout > 0, this specifies the maximum wait time, in
162                   seconds
163                   if timeout <= 0, the select() call won't block, and will
164                   report the currently ready file objects
165                   if timeout is None, select() will block until a monitored
166                   file object becomes ready
167
168        Returns:
169        list of (key, events) for ready file objects
170        `events` is a bitwise mask of EVENT_READ|EVENT_WRITE
171        """
172        raise NotImplementedError
173
174    def close(self):
175        """Close the selector.
176
177        This must be called to make sure that any underlying resource is freed.
178        """
179        pass
180
181    def get_key(self, fileobj):
182        """Return the key associated to a registered file object.
183
184        Returns:
185        SelectorKey for this file object
186        """
187        mapping = self.get_map()
188        if mapping is None:
189            raise RuntimeError('Selector is closed')
190        try:
191            return mapping[fileobj]
192        except KeyError:
193            raise KeyError("{!r} is not registered".format(fileobj)) from None
194
195    @abstractmethod
196    def get_map(self):
197        """Return a mapping of file objects to selector keys."""
198        raise NotImplementedError
199
200    def __enter__(self):
201        return self
202
203    def __exit__(self, *args):
204        self.close()
205
206
207class _BaseSelectorImpl(BaseSelector):
208    """Base selector implementation."""
209
210    def __init__(self):
211        # this maps file descriptors to keys
212        self._fd_to_key = {}
213        # read-only mapping returned by get_map()
214        self._map = _SelectorMapping(self)
215
216    def _fileobj_lookup(self, fileobj):
217        """Return a file descriptor from a file object.
218
219        This wraps _fileobj_to_fd() to do an exhaustive search in case
220        the object is invalid but we still have it in our map.  This
221        is used by unregister() so we can unregister an object that
222        was previously registered even if it is closed.  It is also
223        used by _SelectorMapping.
224        """
225        try:
226            return _fileobj_to_fd(fileobj)
227        except ValueError:
228            # Do an exhaustive search.
229            for key in self._fd_to_key.values():
230                if key.fileobj is fileobj:
231                    return key.fd
232            # Raise ValueError after all.
233            raise
234
235    def register(self, fileobj, events, data=None):
236        if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
237            raise ValueError("Invalid events: {!r}".format(events))
238
239        key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
240
241        if key.fd in self._fd_to_key:
242            raise KeyError("{!r} (FD {}) is already registered"
243                           .format(fileobj, key.fd))
244
245        self._fd_to_key[key.fd] = key
246        return key
247
248    def unregister(self, fileobj):
249        try:
250            key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
251        except KeyError:
252            raise KeyError("{!r} is not registered".format(fileobj)) from None
253        return key
254
255    def modify(self, fileobj, events, data=None):
256        try:
257            key = self._fd_to_key[self._fileobj_lookup(fileobj)]
258        except KeyError:
259            raise KeyError("{!r} is not registered".format(fileobj)) from None
260        if events != key.events:
261            self.unregister(fileobj)
262            key = self.register(fileobj, events, data)
263        elif data != key.data:
264            # Use a shortcut to update the data.
265            key = key._replace(data=data)
266            self._fd_to_key[key.fd] = key
267        return key
268
269    def close(self):
270        self._fd_to_key.clear()
271        self._map = None
272
273    def get_map(self):
274        return self._map
275
276    def _key_from_fd(self, fd):
277        """Return the key associated to a given file descriptor.
278
279        Parameters:
280        fd -- file descriptor
281
282        Returns:
283        corresponding key, or None if not found
284        """
285        try:
286            return self._fd_to_key[fd]
287        except KeyError:
288            return None
289
290
291class SelectSelector(_BaseSelectorImpl):
292    """Select-based selector."""
293
294    def __init__(self):
295        super().__init__()
296        self._readers = set()
297        self._writers = set()
298
299    def register(self, fileobj, events, data=None):
300        key = super().register(fileobj, events, data)
301        if events & EVENT_READ:
302            self._readers.add(key.fd)
303        if events & EVENT_WRITE:
304            self._writers.add(key.fd)
305        return key
306
307    def unregister(self, fileobj):
308        key = super().unregister(fileobj)
309        self._readers.discard(key.fd)
310        self._writers.discard(key.fd)
311        return key
312
313    if sys.platform == 'win32':
314        def _select(self, r, w, _, timeout=None):
315            r, w, x = select.select(r, w, w, timeout)
316            return r, w + x, []
317    else:
318        _select = select.select
319
320    def select(self, timeout=None):
321        timeout = None if timeout is None else max(timeout, 0)
322        ready = []
323        try:
324            r, w, _ = self._select(self._readers, self._writers, [], timeout)
325        except InterruptedError:
326            return ready
327        r = set(r)
328        w = set(w)
329        for fd in r | w:
330            events = 0
331            if fd in r:
332                events |= EVENT_READ
333            if fd in w:
334                events |= EVENT_WRITE
335
336            key = self._key_from_fd(fd)
337            if key:
338                ready.append((key, events & key.events))
339        return ready
340
341
342class _PollLikeSelector(_BaseSelectorImpl):
343    """Base class shared between poll, epoll and devpoll selectors."""
344    _selector_cls = None
345    _EVENT_READ = None
346    _EVENT_WRITE = None
347
348    def __init__(self):
349        super().__init__()
350        self._selector = self._selector_cls()
351
352    def register(self, fileobj, events, data=None):
353        key = super().register(fileobj, events, data)
354        poller_events = 0
355        if events & EVENT_READ:
356            poller_events |= self._EVENT_READ
357        if events & EVENT_WRITE:
358            poller_events |= self._EVENT_WRITE
359        try:
360            self._selector.register(key.fd, poller_events)
361        except:
362            super().unregister(fileobj)
363            raise
364        return key
365
366    def unregister(self, fileobj):
367        key = super().unregister(fileobj)
368        try:
369            self._selector.unregister(key.fd)
370        except OSError:
371            # This can happen if the FD was closed since it
372            # was registered.
373            pass
374        return key
375
376    def modify(self, fileobj, events, data=None):
377        try:
378            key = self._fd_to_key[self._fileobj_lookup(fileobj)]
379        except KeyError:
380            raise KeyError(f"{fileobj!r} is not registered") from None
381
382        changed = False
383        if events != key.events:
384            selector_events = 0
385            if events & EVENT_READ:
386                selector_events |= self._EVENT_READ
387            if events & EVENT_WRITE:
388                selector_events |= self._EVENT_WRITE
389            try:
390                self._selector.modify(key.fd, selector_events)
391            except:
392                super().unregister(fileobj)
393                raise
394            changed = True
395        if data != key.data:
396            changed = True
397
398        if changed:
399            key = key._replace(events=events, data=data)
400            self._fd_to_key[key.fd] = key
401        return key
402
403    def select(self, timeout=None):
404        # This is shared between poll() and epoll().
405        # epoll() has a different signature and handling of timeout parameter.
406        if timeout is None:
407            timeout = None
408        elif timeout <= 0:
409            timeout = 0
410        else:
411            # poll() has a resolution of 1 millisecond, round away from
412            # zero to wait *at least* timeout seconds.
413            timeout = math.ceil(timeout * 1e3)
414        ready = []
415        try:
416            fd_event_list = self._selector.poll(timeout)
417        except InterruptedError:
418            return ready
419        for fd, event in fd_event_list:
420            events = 0
421            if event & ~self._EVENT_READ:
422                events |= EVENT_WRITE
423            if event & ~self._EVENT_WRITE:
424                events |= EVENT_READ
425
426            key = self._key_from_fd(fd)
427            if key:
428                ready.append((key, events & key.events))
429        return ready
430
431
432if hasattr(select, 'poll'):
433
434    class PollSelector(_PollLikeSelector):
435        """Poll-based selector."""
436        _selector_cls = select.poll
437        _EVENT_READ = select.POLLIN
438        _EVENT_WRITE = select.POLLOUT
439
440
441if hasattr(select, 'epoll'):
442
443    class EpollSelector(_PollLikeSelector):
444        """Epoll-based selector."""
445        _selector_cls = select.epoll
446        _EVENT_READ = select.EPOLLIN
447        _EVENT_WRITE = select.EPOLLOUT
448
449        def fileno(self):
450            return self._selector.fileno()
451
452        def select(self, timeout=None):
453            if timeout is None:
454                timeout = -1
455            elif timeout <= 0:
456                timeout = 0
457            else:
458                # epoll_wait() has a resolution of 1 millisecond, round away
459                # from zero to wait *at least* timeout seconds.
460                timeout = math.ceil(timeout * 1e3) * 1e-3
461
462            # epoll_wait() expects `maxevents` to be greater than zero;
463            # we want to make sure that `select()` can be called when no
464            # FD is registered.
465            max_ev = max(len(self._fd_to_key), 1)
466
467            ready = []
468            try:
469                fd_event_list = self._selector.poll(timeout, max_ev)
470            except InterruptedError:
471                return ready
472            for fd, event in fd_event_list:
473                events = 0
474                if event & ~select.EPOLLIN:
475                    events |= EVENT_WRITE
476                if event & ~select.EPOLLOUT:
477                    events |= EVENT_READ
478
479                key = self._key_from_fd(fd)
480                if key:
481                    ready.append((key, events & key.events))
482            return ready
483
484        def close(self):
485            self._selector.close()
486            super().close()
487
488
489if hasattr(select, 'devpoll'):
490
491    class DevpollSelector(_PollLikeSelector):
492        """Solaris /dev/poll selector."""
493        _selector_cls = select.devpoll
494        _EVENT_READ = select.POLLIN
495        _EVENT_WRITE = select.POLLOUT
496
497        def fileno(self):
498            return self._selector.fileno()
499
500        def close(self):
501            self._selector.close()
502            super().close()
503
504
505if hasattr(select, 'kqueue'):
506
507    class KqueueSelector(_BaseSelectorImpl):
508        """Kqueue-based selector."""
509
510        def __init__(self):
511            super().__init__()
512            self._selector = select.kqueue()
513
514        def fileno(self):
515            return self._selector.fileno()
516
517        def register(self, fileobj, events, data=None):
518            key = super().register(fileobj, events, data)
519            try:
520                if events & EVENT_READ:
521                    kev = select.kevent(key.fd, select.KQ_FILTER_READ,
522                                        select.KQ_EV_ADD)
523                    self._selector.control([kev], 0, 0)
524                if events & EVENT_WRITE:
525                    kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
526                                        select.KQ_EV_ADD)
527                    self._selector.control([kev], 0, 0)
528            except:
529                super().unregister(fileobj)
530                raise
531            return key
532
533        def unregister(self, fileobj):
534            key = super().unregister(fileobj)
535            if key.events & EVENT_READ:
536                kev = select.kevent(key.fd, select.KQ_FILTER_READ,
537                                    select.KQ_EV_DELETE)
538                try:
539                    self._selector.control([kev], 0, 0)
540                except OSError:
541                    # This can happen if the FD was closed since it
542                    # was registered.
543                    pass
544            if key.events & EVENT_WRITE:
545                kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
546                                    select.KQ_EV_DELETE)
547                try:
548                    self._selector.control([kev], 0, 0)
549                except OSError:
550                    # See comment above.
551                    pass
552            return key
553
554        def select(self, timeout=None):
555            timeout = None if timeout is None else max(timeout, 0)
556            # If max_ev is 0, kqueue will ignore the timeout. For consistent
557            # behavior with the other selector classes, we prevent that here
558            # (using max). See https://bugs.python.org/issue29255
559            max_ev = max(len(self._fd_to_key), 1)
560            ready = []
561            try:
562                kev_list = self._selector.control(None, max_ev, timeout)
563            except InterruptedError:
564                return ready
565            for kev in kev_list:
566                fd = kev.ident
567                flag = kev.filter
568                events = 0
569                if flag == select.KQ_FILTER_READ:
570                    events |= EVENT_READ
571                if flag == select.KQ_FILTER_WRITE:
572                    events |= EVENT_WRITE
573
574                key = self._key_from_fd(fd)
575                if key:
576                    ready.append((key, events & key.events))
577            return ready
578
579        def close(self):
580            self._selector.close()
581            super().close()
582
583
584def _can_use(method):
585    """Check if we can use the selector depending upon the
586    operating system. """
587    # Implementation based upon https://github.com/sethmlarson/selectors2/blob/master/selectors2.py
588    selector = getattr(select, method, None)
589    if selector is None:
590        # select module does not implement method
591        return False
592    # check if the OS and Kernel actually support the method. Call may fail with
593    # OSError: [Errno 38] Function not implemented
594    try:
595        selector_obj = selector()
596        if method == 'poll':
597            # check that poll actually works
598            selector_obj.poll(0)
599        else:
600            # close epoll, kqueue, and devpoll fd
601            selector_obj.close()
602        return True
603    except OSError:
604        return False
605
606
607# Choose the best implementation, roughly:
608#    epoll|kqueue|devpoll > poll > select.
609# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
610if _can_use('kqueue'):
611    DefaultSelector = KqueueSelector
612elif _can_use('epoll'):
613    DefaultSelector = EpollSelector
614elif _can_use('devpoll'):
615    DefaultSelector = DevpollSelector
616elif _can_use('poll'):
617    DefaultSelector = PollSelector
618else:
619    DefaultSelector = SelectSelector
620