• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Selectors module.
2
3This module allows high-level and efficient I/O multiplexing, built upon the
4`select` module primitives.
5"""
6
7
8from abc import ABCMeta, abstractmethod
9from collections import namedtuple
10from collections.abc import Mapping
11import math
12import select
13import sys
14
15
16# generic events, that must be mapped to implementation-specific ones
17EVENT_READ = (1 << 0)
18EVENT_WRITE = (1 << 1)
19
20
21def _fileobj_to_fd(fileobj):
22    """Return a file descriptor from a file object.
23
24    Parameters:
25    fileobj -- file object or file descriptor
26
27    Returns:
28    corresponding file descriptor
29
30    Raises:
31    ValueError if the object is invalid
32    """
33    if isinstance(fileobj, int):
34        fd = fileobj
35    else:
36        try:
37            fd = int(fileobj.fileno())
38        except (AttributeError, TypeError, ValueError):
39            raise ValueError("Invalid file object: "
40                             "{!r}".format(fileobj)) from None
41    if fd < 0:
42        raise ValueError("Invalid file descriptor: {}".format(fd))
43    return fd
44
45
46SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
47
48SelectorKey.__doc__ = """SelectorKey(fileobj, fd, events, data)
49
50    Object used to associate a file object to its backing
51    file descriptor, selected event mask, and attached data.
52"""
53if sys.version_info >= (3, 5):
54    SelectorKey.fileobj.__doc__ = 'File object registered.'
55    SelectorKey.fd.__doc__ = 'Underlying file descriptor.'
56    SelectorKey.events.__doc__ = 'Events that must be waited for on this file object.'
57    SelectorKey.data.__doc__ = ('''Optional opaque data associated to this file object.
58    For example, this could be used to store a per-client session ID.''')
59
60class _SelectorMapping(Mapping):
61    """Mapping of file objects to selector keys."""
62
63    def __init__(self, selector):
64        self._selector = selector
65
66    def __len__(self):
67        return len(self._selector._fd_to_key)
68
69    def __getitem__(self, fileobj):
70        try:
71            fd = self._selector._fileobj_lookup(fileobj)
72            return self._selector._fd_to_key[fd]
73        except KeyError:
74            raise KeyError("{!r} is not registered".format(fileobj)) from None
75
76    def __iter__(self):
77        return iter(self._selector._fd_to_key)
78
79
80class BaseSelector(metaclass=ABCMeta):
81    """Selector abstract base class.
82
83    A selector supports registering file objects to be monitored for specific
84    I/O events.
85
86    A file object is a file descriptor or any object with a `fileno()` method.
87    An arbitrary object can be attached to the file object, which can be used
88    for example to store context information, a callback, etc.
89
90    A selector can use various implementations (select(), poll(), epoll()...)
91    depending on the platform. The default `Selector` class uses the most
92    efficient implementation on the current platform.
93    """
94
95    @abstractmethod
96    def register(self, fileobj, events, data=None):
97        """Register a file object.
98
99        Parameters:
100        fileobj -- file object or file descriptor
101        events  -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
102        data    -- attached data
103
104        Returns:
105        SelectorKey instance
106
107        Raises:
108        ValueError if events is invalid
109        KeyError if fileobj is already registered
110        OSError if fileobj is closed or otherwise is unacceptable to
111                the underlying system call (if a system call is made)
112
113        Note:
114        OSError may or may not be raised
115        """
116        raise NotImplementedError
117
118    @abstractmethod
119    def unregister(self, fileobj):
120        """Unregister a file object.
121
122        Parameters:
123        fileobj -- file object or file descriptor
124
125        Returns:
126        SelectorKey instance
127
128        Raises:
129        KeyError if fileobj is not registered
130
131        Note:
132        If fileobj is registered but has since been closed this does
133        *not* raise OSError (even if the wrapped syscall does)
134        """
135        raise NotImplementedError
136
137    def modify(self, fileobj, events, data=None):
138        """Change a registered file object monitored events or attached data.
139
140        Parameters:
141        fileobj -- file object or file descriptor
142        events  -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
143        data    -- attached data
144
145        Returns:
146        SelectorKey instance
147
148        Raises:
149        Anything that unregister() or register() raises
150        """
151        self.unregister(fileobj)
152        return self.register(fileobj, events, data)
153
154    @abstractmethod
155    def select(self, timeout=None):
156        """Perform the actual selection, until some monitored file objects are
157        ready or a timeout expires.
158
159        Parameters:
160        timeout -- if timeout > 0, this specifies the maximum wait time, in
161                   seconds
162                   if timeout <= 0, the select() call won't block, and will
163                   report the currently ready file objects
164                   if timeout is None, select() will block until a monitored
165                   file object becomes ready
166
167        Returns:
168        list of (key, events) for ready file objects
169        `events` is a bitwise mask of EVENT_READ|EVENT_WRITE
170        """
171        raise NotImplementedError
172
173    def close(self):
174        """Close the selector.
175
176        This must be called to make sure that any underlying resource is freed.
177        """
178        pass
179
180    def get_key(self, fileobj):
181        """Return the key associated to a registered file object.
182
183        Returns:
184        SelectorKey for this file object
185        """
186        mapping = self.get_map()
187        if mapping is None:
188            raise RuntimeError('Selector is closed')
189        try:
190            return mapping[fileobj]
191        except KeyError:
192            raise KeyError("{!r} is not registered".format(fileobj)) from None
193
194    @abstractmethod
195    def get_map(self):
196        """Return a mapping of file objects to selector keys."""
197        raise NotImplementedError
198
199    def __enter__(self):
200        return self
201
202    def __exit__(self, *args):
203        self.close()
204
205
206class _BaseSelectorImpl(BaseSelector):
207    """Base selector implementation."""
208
209    def __init__(self):
210        # this maps file descriptors to keys
211        self._fd_to_key = {}
212        # read-only mapping returned by get_map()
213        self._map = _SelectorMapping(self)
214
215    def _fileobj_lookup(self, fileobj):
216        """Return a file descriptor from a file object.
217
218        This wraps _fileobj_to_fd() to do an exhaustive search in case
219        the object is invalid but we still have it in our map.  This
220        is used by unregister() so we can unregister an object that
221        was previously registered even if it is closed.  It is also
222        used by _SelectorMapping.
223        """
224        try:
225            return _fileobj_to_fd(fileobj)
226        except ValueError:
227            # Do an exhaustive search.
228            for key in self._fd_to_key.values():
229                if key.fileobj is fileobj:
230                    return key.fd
231            # Raise ValueError after all.
232            raise
233
234    def register(self, fileobj, events, data=None):
235        if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
236            raise ValueError("Invalid events: {!r}".format(events))
237
238        key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
239
240        if key.fd in self._fd_to_key:
241            raise KeyError("{!r} (FD {}) is already registered"
242                           .format(fileobj, key.fd))
243
244        self._fd_to_key[key.fd] = key
245        return key
246
247    def unregister(self, fileobj):
248        try:
249            key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
250        except KeyError:
251            raise KeyError("{!r} is not registered".format(fileobj)) from None
252        return key
253
254    def modify(self, fileobj, events, data=None):
255        try:
256            key = self._fd_to_key[self._fileobj_lookup(fileobj)]
257        except KeyError:
258            raise KeyError("{!r} is not registered".format(fileobj)) from None
259        if events != key.events:
260            self.unregister(fileobj)
261            key = self.register(fileobj, events, data)
262        elif data != key.data:
263            # Use a shortcut to update the data.
264            key = key._replace(data=data)
265            self._fd_to_key[key.fd] = key
266        return key
267
268    def close(self):
269        self._fd_to_key.clear()
270        self._map = None
271
272    def get_map(self):
273        return self._map
274
275    def _key_from_fd(self, fd):
276        """Return the key associated to a given file descriptor.
277
278        Parameters:
279        fd -- file descriptor
280
281        Returns:
282        corresponding key, or None if not found
283        """
284        try:
285            return self._fd_to_key[fd]
286        except KeyError:
287            return None
288
289
290class SelectSelector(_BaseSelectorImpl):
291    """Select-based selector."""
292
293    def __init__(self):
294        super().__init__()
295        self._readers = set()
296        self._writers = set()
297
298    def register(self, fileobj, events, data=None):
299        key = super().register(fileobj, events, data)
300        if events & EVENT_READ:
301            self._readers.add(key.fd)
302        if events & EVENT_WRITE:
303            self._writers.add(key.fd)
304        return key
305
306    def unregister(self, fileobj):
307        key = super().unregister(fileobj)
308        self._readers.discard(key.fd)
309        self._writers.discard(key.fd)
310        return key
311
312    if sys.platform == 'win32':
313        def _select(self, r, w, _, timeout=None):
314            r, w, x = select.select(r, w, w, timeout)
315            return r, w + x, []
316    else:
317        _select = select.select
318
319    def select(self, timeout=None):
320        timeout = None if timeout is None else max(timeout, 0)
321        ready = []
322        try:
323            r, w, _ = self._select(self._readers, self._writers, [], timeout)
324        except InterruptedError:
325            return ready
326        r = set(r)
327        w = set(w)
328        for fd in r | w:
329            events = 0
330            if fd in r:
331                events |= EVENT_READ
332            if fd in w:
333                events |= EVENT_WRITE
334
335            key = self._key_from_fd(fd)
336            if key:
337                ready.append((key, events & key.events))
338        return ready
339
340
341class _PollLikeSelector(_BaseSelectorImpl):
342    """Base class shared between poll, epoll and devpoll selectors."""
343    _selector_cls = None
344    _EVENT_READ = None
345    _EVENT_WRITE = None
346
347    def __init__(self):
348        super().__init__()
349        self._selector = self._selector_cls()
350
351    def register(self, fileobj, events, data=None):
352        key = super().register(fileobj, events, data)
353        poller_events = 0
354        if events & EVENT_READ:
355            poller_events |= self._EVENT_READ
356        if events & EVENT_WRITE:
357            poller_events |= self._EVENT_WRITE
358        try:
359            self._selector.register(key.fd, poller_events)
360        except:
361            super().unregister(fileobj)
362            raise
363        return key
364
365    def unregister(self, fileobj):
366        key = super().unregister(fileobj)
367        try:
368            self._selector.unregister(key.fd)
369        except OSError:
370            # This can happen if the FD was closed since it
371            # was registered.
372            pass
373        return key
374
375    def modify(self, fileobj, events, data=None):
376        try:
377            key = self._fd_to_key[self._fileobj_lookup(fileobj)]
378        except KeyError:
379            raise KeyError(f"{fileobj!r} is not registered") from None
380
381        changed = False
382        if events != key.events:
383            selector_events = 0
384            if events & EVENT_READ:
385                selector_events |= self._EVENT_READ
386            if events & EVENT_WRITE:
387                selector_events |= self._EVENT_WRITE
388            try:
389                self._selector.modify(key.fd, selector_events)
390            except:
391                super().unregister(fileobj)
392                raise
393            changed = True
394        if data != key.data:
395            changed = True
396
397        if changed:
398            key = key._replace(events=events, data=data)
399            self._fd_to_key[key.fd] = key
400        return key
401
402    def select(self, timeout=None):
403        # This is shared between poll() and epoll().
404        # epoll() has a different signature and handling of timeout parameter.
405        if timeout is None:
406            timeout = None
407        elif timeout <= 0:
408            timeout = 0
409        else:
410            # poll() has a resolution of 1 millisecond, round away from
411            # zero to wait *at least* timeout seconds.
412            timeout = math.ceil(timeout * 1e3)
413        ready = []
414        try:
415            fd_event_list = self._selector.poll(timeout)
416        except InterruptedError:
417            return ready
418        for fd, event in fd_event_list:
419            events = 0
420            if event & ~self._EVENT_READ:
421                events |= EVENT_WRITE
422            if event & ~self._EVENT_WRITE:
423                events |= EVENT_READ
424
425            key = self._key_from_fd(fd)
426            if key:
427                ready.append((key, events & key.events))
428        return ready
429
430
431if hasattr(select, 'poll'):
432
433    class PollSelector(_PollLikeSelector):
434        """Poll-based selector."""
435        _selector_cls = select.poll
436        _EVENT_READ = select.POLLIN
437        _EVENT_WRITE = select.POLLOUT
438
439
440if hasattr(select, 'epoll'):
441
442    class EpollSelector(_PollLikeSelector):
443        """Epoll-based selector."""
444        _selector_cls = select.epoll
445        _EVENT_READ = select.EPOLLIN
446        _EVENT_WRITE = select.EPOLLOUT
447
448        def fileno(self):
449            return self._selector.fileno()
450
451        def select(self, timeout=None):
452            if timeout is None:
453                timeout = -1
454            elif timeout <= 0:
455                timeout = 0
456            else:
457                # epoll_wait() has a resolution of 1 millisecond, round away
458                # from zero to wait *at least* timeout seconds.
459                timeout = math.ceil(timeout * 1e3) * 1e-3
460
461            # epoll_wait() expects `maxevents` to be greater than zero;
462            # we want to make sure that `select()` can be called when no
463            # FD is registered.
464            max_ev = max(len(self._fd_to_key), 1)
465
466            ready = []
467            try:
468                fd_event_list = self._selector.poll(timeout, max_ev)
469            except InterruptedError:
470                return ready
471            for fd, event in fd_event_list:
472                events = 0
473                if event & ~select.EPOLLIN:
474                    events |= EVENT_WRITE
475                if event & ~select.EPOLLOUT:
476                    events |= EVENT_READ
477
478                key = self._key_from_fd(fd)
479                if key:
480                    ready.append((key, events & key.events))
481            return ready
482
483        def close(self):
484            self._selector.close()
485            super().close()
486
487
488if hasattr(select, 'devpoll'):
489
490    class DevpollSelector(_PollLikeSelector):
491        """Solaris /dev/poll selector."""
492        _selector_cls = select.devpoll
493        _EVENT_READ = select.POLLIN
494        _EVENT_WRITE = select.POLLOUT
495
496        def fileno(self):
497            return self._selector.fileno()
498
499        def close(self):
500            self._selector.close()
501            super().close()
502
503
504if hasattr(select, 'kqueue'):
505
506    class KqueueSelector(_BaseSelectorImpl):
507        """Kqueue-based selector."""
508
509        def __init__(self):
510            super().__init__()
511            self._selector = select.kqueue()
512
513        def fileno(self):
514            return self._selector.fileno()
515
516        def register(self, fileobj, events, data=None):
517            key = super().register(fileobj, events, data)
518            try:
519                if events & EVENT_READ:
520                    kev = select.kevent(key.fd, select.KQ_FILTER_READ,
521                                        select.KQ_EV_ADD)
522                    self._selector.control([kev], 0, 0)
523                if events & EVENT_WRITE:
524                    kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
525                                        select.KQ_EV_ADD)
526                    self._selector.control([kev], 0, 0)
527            except:
528                super().unregister(fileobj)
529                raise
530            return key
531
532        def unregister(self, fileobj):
533            key = super().unregister(fileobj)
534            if key.events & EVENT_READ:
535                kev = select.kevent(key.fd, select.KQ_FILTER_READ,
536                                    select.KQ_EV_DELETE)
537                try:
538                    self._selector.control([kev], 0, 0)
539                except OSError:
540                    # This can happen if the FD was closed since it
541                    # was registered.
542                    pass
543            if key.events & EVENT_WRITE:
544                kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
545                                    select.KQ_EV_DELETE)
546                try:
547                    self._selector.control([kev], 0, 0)
548                except OSError:
549                    # See comment above.
550                    pass
551            return key
552
553        def select(self, timeout=None):
554            timeout = None if timeout is None else max(timeout, 0)
555            max_ev = len(self._fd_to_key)
556            ready = []
557            try:
558                kev_list = self._selector.control(None, max_ev, timeout)
559            except InterruptedError:
560                return ready
561            for kev in kev_list:
562                fd = kev.ident
563                flag = kev.filter
564                events = 0
565                if flag == select.KQ_FILTER_READ:
566                    events |= EVENT_READ
567                if flag == select.KQ_FILTER_WRITE:
568                    events |= EVENT_WRITE
569
570                key = self._key_from_fd(fd)
571                if key:
572                    ready.append((key, events & key.events))
573            return ready
574
575        def close(self):
576            self._selector.close()
577            super().close()
578
579
580# Choose the best implementation, roughly:
581#    epoll|kqueue|devpoll > poll > select.
582# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
583if 'KqueueSelector' in globals():
584    DefaultSelector = KqueueSelector
585elif 'EpollSelector' in globals():
586    DefaultSelector = EpollSelector
587elif 'DevpollSelector' in globals():
588    DefaultSelector = DevpollSelector
589elif 'PollSelector' in globals():
590    DefaultSelector = PollSelector
591else:
592    DefaultSelector = SelectSelector
593