• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Selectors module.
2
3This module allows high-level and efficient I/O multiplexing, built upon the
4`select` module primitives.
5"""
6
7
8from abc import ABCMeta, abstractmethod
9from collections import namedtuple
10from collections.abc import Mapping
11import math
12import select
13import sys
14
15
16# generic events, that must be mapped to implementation-specific ones
17EVENT_READ = (1 << 0)
18EVENT_WRITE = (1 << 1)
19
20
21def _fileobj_to_fd(fileobj):
22    """Return a file descriptor from a file object.
23
24    Parameters:
25    fileobj -- file object or file descriptor
26
27    Returns:
28    corresponding file descriptor
29
30    Raises:
31    ValueError if the object is invalid
32    """
33    if isinstance(fileobj, int):
34        fd = fileobj
35    else:
36        try:
37            fd = int(fileobj.fileno())
38        except (AttributeError, TypeError, ValueError):
39            raise ValueError("Invalid file object: "
40                             "{!r}".format(fileobj)) from None
41    if fd < 0:
42        raise ValueError("Invalid file descriptor: {}".format(fd))
43    return fd
44
45
46SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
47
48SelectorKey.__doc__ = """SelectorKey(fileobj, fd, events, data)
49
50    Object used to associate a file object to its backing
51    file descriptor, selected event mask, and attached data.
52"""
53SelectorKey.fileobj.__doc__ = 'File object registered.'
54SelectorKey.fd.__doc__ = 'Underlying file descriptor.'
55SelectorKey.events.__doc__ = 'Events that must be waited for on this file object.'
56SelectorKey.data.__doc__ = ('''Optional opaque data associated to this file object.
57For example, this could be used to store a per-client session ID.''')
58
59
60class _SelectorMapping(Mapping):
61    """Mapping of file objects to selector keys."""
62
63    def __init__(self, selector):
64        self._selector = selector
65
66    def __len__(self):
67        return len(self._selector._fd_to_key)
68
69    def get(self, fileobj, default=None):
70        fd = self._selector._fileobj_lookup(fileobj)
71        return self._selector._fd_to_key.get(fd, default)
72
73    def __getitem__(self, fileobj):
74        fd = self._selector._fileobj_lookup(fileobj)
75        key = self._selector._fd_to_key.get(fd)
76        if key is None:
77            raise KeyError("{!r} is not registered".format(fileobj))
78        return key
79
80    def __iter__(self):
81        return iter(self._selector._fd_to_key)
82
83
84class BaseSelector(metaclass=ABCMeta):
85    """Selector abstract base class.
86
87    A selector supports registering file objects to be monitored for specific
88    I/O events.
89
90    A file object is a file descriptor or any object with a `fileno()` method.
91    An arbitrary object can be attached to the file object, which can be used
92    for example to store context information, a callback, etc.
93
94    A selector can use various implementations (select(), poll(), epoll()...)
95    depending on the platform. The default `Selector` class uses the most
96    efficient implementation on the current platform.
97    """
98
99    @abstractmethod
100    def register(self, fileobj, events, data=None):
101        """Register a file object.
102
103        Parameters:
104        fileobj -- file object or file descriptor
105        events  -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
106        data    -- attached data
107
108        Returns:
109        SelectorKey instance
110
111        Raises:
112        ValueError if events is invalid
113        KeyError if fileobj is already registered
114        OSError if fileobj is closed or otherwise is unacceptable to
115                the underlying system call (if a system call is made)
116
117        Note:
118        OSError may or may not be raised
119        """
120        raise NotImplementedError
121
122    @abstractmethod
123    def unregister(self, fileobj):
124        """Unregister a file object.
125
126        Parameters:
127        fileobj -- file object or file descriptor
128
129        Returns:
130        SelectorKey instance
131
132        Raises:
133        KeyError if fileobj is not registered
134
135        Note:
136        If fileobj is registered but has since been closed this does
137        *not* raise OSError (even if the wrapped syscall does)
138        """
139        raise NotImplementedError
140
141    def modify(self, fileobj, events, data=None):
142        """Change a registered file object monitored events or attached data.
143
144        Parameters:
145        fileobj -- file object or file descriptor
146        events  -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
147        data    -- attached data
148
149        Returns:
150        SelectorKey instance
151
152        Raises:
153        Anything that unregister() or register() raises
154        """
155        self.unregister(fileobj)
156        return self.register(fileobj, events, data)
157
158    @abstractmethod
159    def select(self, timeout=None):
160        """Perform the actual selection, until some monitored file objects are
161        ready or a timeout expires.
162
163        Parameters:
164        timeout -- if timeout > 0, this specifies the maximum wait time, in
165                   seconds
166                   if timeout <= 0, the select() call won't block, and will
167                   report the currently ready file objects
168                   if timeout is None, select() will block until a monitored
169                   file object becomes ready
170
171        Returns:
172        list of (key, events) for ready file objects
173        `events` is a bitwise mask of EVENT_READ|EVENT_WRITE
174        """
175        raise NotImplementedError
176
177    def close(self):
178        """Close the selector.
179
180        This must be called to make sure that any underlying resource is freed.
181        """
182        pass
183
184    def get_key(self, fileobj):
185        """Return the key associated to a registered file object.
186
187        Returns:
188        SelectorKey for this file object
189        """
190        mapping = self.get_map()
191        if mapping is None:
192            raise RuntimeError('Selector is closed')
193        try:
194            return mapping[fileobj]
195        except KeyError:
196            raise KeyError("{!r} is not registered".format(fileobj)) from None
197
198    @abstractmethod
199    def get_map(self):
200        """Return a mapping of file objects to selector keys."""
201        raise NotImplementedError
202
203    def __enter__(self):
204        return self
205
206    def __exit__(self, *args):
207        self.close()
208
209
210class _BaseSelectorImpl(BaseSelector):
211    """Base selector implementation."""
212
213    def __init__(self):
214        # this maps file descriptors to keys
215        self._fd_to_key = {}
216        # read-only mapping returned by get_map()
217        self._map = _SelectorMapping(self)
218
219    def _fileobj_lookup(self, fileobj):
220        """Return a file descriptor from a file object.
221
222        This wraps _fileobj_to_fd() to do an exhaustive search in case
223        the object is invalid but we still have it in our map.  This
224        is used by unregister() so we can unregister an object that
225        was previously registered even if it is closed.  It is also
226        used by _SelectorMapping.
227        """
228        try:
229            return _fileobj_to_fd(fileobj)
230        except ValueError:
231            # Do an exhaustive search.
232            for key in self._fd_to_key.values():
233                if key.fileobj is fileobj:
234                    return key.fd
235            # Raise ValueError after all.
236            raise
237
238    def register(self, fileobj, events, data=None):
239        if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
240            raise ValueError("Invalid events: {!r}".format(events))
241
242        key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
243
244        if key.fd in self._fd_to_key:
245            raise KeyError("{!r} (FD {}) is already registered"
246                           .format(fileobj, key.fd))
247
248        self._fd_to_key[key.fd] = key
249        return key
250
251    def unregister(self, fileobj):
252        try:
253            key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
254        except KeyError:
255            raise KeyError("{!r} is not registered".format(fileobj)) from None
256        return key
257
258    def modify(self, fileobj, events, data=None):
259        try:
260            key = self._fd_to_key[self._fileobj_lookup(fileobj)]
261        except KeyError:
262            raise KeyError("{!r} is not registered".format(fileobj)) from None
263        if events != key.events:
264            self.unregister(fileobj)
265            key = self.register(fileobj, events, data)
266        elif data != key.data:
267            # Use a shortcut to update the data.
268            key = key._replace(data=data)
269            self._fd_to_key[key.fd] = key
270        return key
271
272    def close(self):
273        self._fd_to_key.clear()
274        self._map = None
275
276    def get_map(self):
277        return self._map
278
279
280
281class SelectSelector(_BaseSelectorImpl):
282    """Select-based selector."""
283
284    def __init__(self):
285        super().__init__()
286        self._readers = set()
287        self._writers = set()
288
289    def register(self, fileobj, events, data=None):
290        key = super().register(fileobj, events, data)
291        if events & EVENT_READ:
292            self._readers.add(key.fd)
293        if events & EVENT_WRITE:
294            self._writers.add(key.fd)
295        return key
296
297    def unregister(self, fileobj):
298        key = super().unregister(fileobj)
299        self._readers.discard(key.fd)
300        self._writers.discard(key.fd)
301        return key
302
303    if sys.platform == 'win32':
304        def _select(self, r, w, _, timeout=None):
305            r, w, x = select.select(r, w, w, timeout)
306            return r, w + x, []
307    else:
308        _select = select.select
309
310    def select(self, timeout=None):
311        timeout = None if timeout is None else max(timeout, 0)
312        ready = []
313        try:
314            r, w, _ = self._select(self._readers, self._writers, [], timeout)
315        except InterruptedError:
316            return ready
317        r = frozenset(r)
318        w = frozenset(w)
319        rw = r | w
320        fd_to_key_get = self._fd_to_key.get
321        for fd in rw:
322            key = fd_to_key_get(fd)
323            if key:
324                events = ((fd in r and EVENT_READ)
325                          | (fd in w and EVENT_WRITE))
326                ready.append((key, events & key.events))
327        return ready
328
329
330class _PollLikeSelector(_BaseSelectorImpl):
331    """Base class shared between poll, epoll and devpoll selectors."""
332    _selector_cls = None
333    _EVENT_READ = None
334    _EVENT_WRITE = None
335
336    def __init__(self):
337        super().__init__()
338        self._selector = self._selector_cls()
339
340    def register(self, fileobj, events, data=None):
341        key = super().register(fileobj, events, data)
342        poller_events = ((events & EVENT_READ and self._EVENT_READ)
343                         | (events & EVENT_WRITE and self._EVENT_WRITE) )
344        try:
345            self._selector.register(key.fd, poller_events)
346        except:
347            super().unregister(fileobj)
348            raise
349        return key
350
351    def unregister(self, fileobj):
352        key = super().unregister(fileobj)
353        try:
354            self._selector.unregister(key.fd)
355        except OSError:
356            # This can happen if the FD was closed since it
357            # was registered.
358            pass
359        return key
360
361    def modify(self, fileobj, events, data=None):
362        try:
363            key = self._fd_to_key[self._fileobj_lookup(fileobj)]
364        except KeyError:
365            raise KeyError(f"{fileobj!r} is not registered") from None
366
367        changed = False
368        if events != key.events:
369            selector_events = ((events & EVENT_READ and self._EVENT_READ)
370                               | (events & EVENT_WRITE and self._EVENT_WRITE))
371            try:
372                self._selector.modify(key.fd, selector_events)
373            except:
374                super().unregister(fileobj)
375                raise
376            changed = True
377        if data != key.data:
378            changed = True
379
380        if changed:
381            key = key._replace(events=events, data=data)
382            self._fd_to_key[key.fd] = key
383        return key
384
385    def select(self, timeout=None):
386        # This is shared between poll() and epoll().
387        # epoll() has a different signature and handling of timeout parameter.
388        if timeout is None:
389            timeout = None
390        elif timeout <= 0:
391            timeout = 0
392        else:
393            # poll() has a resolution of 1 millisecond, round away from
394            # zero to wait *at least* timeout seconds.
395            timeout = math.ceil(timeout * 1e3)
396        ready = []
397        try:
398            fd_event_list = self._selector.poll(timeout)
399        except InterruptedError:
400            return ready
401
402        fd_to_key_get = self._fd_to_key.get
403        for fd, event in fd_event_list:
404            key = fd_to_key_get(fd)
405            if key:
406                events = ((event & ~self._EVENT_READ and EVENT_WRITE)
407                           | (event & ~self._EVENT_WRITE and EVENT_READ))
408                ready.append((key, events & key.events))
409        return ready
410
411
412if hasattr(select, 'poll'):
413
414    class PollSelector(_PollLikeSelector):
415        """Poll-based selector."""
416        _selector_cls = select.poll
417        _EVENT_READ = select.POLLIN
418        _EVENT_WRITE = select.POLLOUT
419
420
421if hasattr(select, 'epoll'):
422
423    _NOT_EPOLLIN = ~select.EPOLLIN
424    _NOT_EPOLLOUT = ~select.EPOLLOUT
425
426    class EpollSelector(_PollLikeSelector):
427        """Epoll-based selector."""
428        _selector_cls = select.epoll
429        _EVENT_READ = select.EPOLLIN
430        _EVENT_WRITE = select.EPOLLOUT
431
432        def fileno(self):
433            return self._selector.fileno()
434
435        def select(self, timeout=None):
436            if timeout is None:
437                timeout = -1
438            elif timeout <= 0:
439                timeout = 0
440            else:
441                # epoll_wait() has a resolution of 1 millisecond, round away
442                # from zero to wait *at least* timeout seconds.
443                timeout = math.ceil(timeout * 1e3) * 1e-3
444
445            # epoll_wait() expects `maxevents` to be greater than zero;
446            # we want to make sure that `select()` can be called when no
447            # FD is registered.
448            max_ev = len(self._fd_to_key) or 1
449
450            ready = []
451            try:
452                fd_event_list = self._selector.poll(timeout, max_ev)
453            except InterruptedError:
454                return ready
455
456            fd_to_key = self._fd_to_key
457            for fd, event in fd_event_list:
458                key = fd_to_key.get(fd)
459                if key:
460                    events = ((event & _NOT_EPOLLIN and EVENT_WRITE)
461                              | (event & _NOT_EPOLLOUT and EVENT_READ))
462                    ready.append((key, events & key.events))
463            return ready
464
465        def close(self):
466            self._selector.close()
467            super().close()
468
469
470if hasattr(select, 'devpoll'):
471
472    class DevpollSelector(_PollLikeSelector):
473        """Solaris /dev/poll selector."""
474        _selector_cls = select.devpoll
475        _EVENT_READ = select.POLLIN
476        _EVENT_WRITE = select.POLLOUT
477
478        def fileno(self):
479            return self._selector.fileno()
480
481        def close(self):
482            self._selector.close()
483            super().close()
484
485
486if hasattr(select, 'kqueue'):
487
488    class KqueueSelector(_BaseSelectorImpl):
489        """Kqueue-based selector."""
490
491        def __init__(self):
492            super().__init__()
493            self._selector = select.kqueue()
494            self._max_events = 0
495
496        def fileno(self):
497            return self._selector.fileno()
498
499        def register(self, fileobj, events, data=None):
500            key = super().register(fileobj, events, data)
501            try:
502                if events & EVENT_READ:
503                    kev = select.kevent(key.fd, select.KQ_FILTER_READ,
504                                        select.KQ_EV_ADD)
505                    self._selector.control([kev], 0, 0)
506                    self._max_events += 1
507                if events & EVENT_WRITE:
508                    kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
509                                        select.KQ_EV_ADD)
510                    self._selector.control([kev], 0, 0)
511                    self._max_events += 1
512            except:
513                super().unregister(fileobj)
514                raise
515            return key
516
517        def unregister(self, fileobj):
518            key = super().unregister(fileobj)
519            if key.events & EVENT_READ:
520                kev = select.kevent(key.fd, select.KQ_FILTER_READ,
521                                    select.KQ_EV_DELETE)
522                self._max_events -= 1
523                try:
524                    self._selector.control([kev], 0, 0)
525                except OSError:
526                    # This can happen if the FD was closed since it
527                    # was registered.
528                    pass
529            if key.events & EVENT_WRITE:
530                kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
531                                    select.KQ_EV_DELETE)
532                self._max_events -= 1
533                try:
534                    self._selector.control([kev], 0, 0)
535                except OSError:
536                    # See comment above.
537                    pass
538            return key
539
540        def select(self, timeout=None):
541            timeout = None if timeout is None else max(timeout, 0)
542            # If max_ev is 0, kqueue will ignore the timeout. For consistent
543            # behavior with the other selector classes, we prevent that here
544            # (using max). See https://bugs.python.org/issue29255
545            max_ev = self._max_events or 1
546            ready = []
547            try:
548                kev_list = self._selector.control(None, max_ev, timeout)
549            except InterruptedError:
550                return ready
551
552            fd_to_key_get = self._fd_to_key.get
553            for kev in kev_list:
554                fd = kev.ident
555                flag = kev.filter
556                key = fd_to_key_get(fd)
557                if key:
558                    events = ((flag == select.KQ_FILTER_READ and EVENT_READ)
559                              | (flag == select.KQ_FILTER_WRITE and EVENT_WRITE))
560                    ready.append((key, events & key.events))
561            return ready
562
563        def close(self):
564            self._selector.close()
565            super().close()
566
567
568def _can_use(method):
569    """Check if we can use the selector depending upon the
570    operating system. """
571    # Implementation based upon https://github.com/sethmlarson/selectors2/blob/master/selectors2.py
572    selector = getattr(select, method, None)
573    if selector is None:
574        # select module does not implement method
575        return False
576    # check if the OS and Kernel actually support the method. Call may fail with
577    # OSError: [Errno 38] Function not implemented
578    try:
579        selector_obj = selector()
580        if method == 'poll':
581            # check that poll actually works
582            selector_obj.poll(0)
583        else:
584            # close epoll, kqueue, and devpoll fd
585            selector_obj.close()
586        return True
587    except OSError:
588        return False
589
590
591# Choose the best implementation, roughly:
592#    epoll|kqueue|devpoll > poll > select.
593# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
594if _can_use('kqueue'):
595    DefaultSelector = KqueueSelector
596elif _can_use('epoll'):
597    DefaultSelector = EpollSelector
598elif _can_use('devpoll'):
599    DefaultSelector = DevpollSelector
600elif _can_use('poll'):
601    DefaultSelector = PollSelector
602else:
603    DefaultSelector = SelectSelector
604