• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Selectors module.
2
3This module allows high-level and efficient I/O multiplexing, built upon the
4`select` module primitives.
5"""
6
7
8from abc import ABCMeta, abstractmethod
9from collections import namedtuple, Mapping
10import math
11import select
12import sys
13
14
15# generic events, that must be mapped to implementation-specific ones
16EVENT_READ = (1 << 0)
17EVENT_WRITE = (1 << 1)
18
19
20def _fileobj_to_fd(fileobj):
21    """Return a file descriptor from a file object.
22
23    Parameters:
24    fileobj -- file object or file descriptor
25
26    Returns:
27    corresponding file descriptor
28
29    Raises:
30    ValueError if the object is invalid
31    """
32    if isinstance(fileobj, int):
33        fd = fileobj
34    else:
35        try:
36            fd = int(fileobj.fileno())
37        except (AttributeError, TypeError, ValueError):
38            raise ValueError("Invalid file object: "
39                             "{!r}".format(fileobj)) from None
40    if fd < 0:
41        raise ValueError("Invalid file descriptor: {}".format(fd))
42    return fd
43
44
45SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
46
47SelectorKey.__doc__ = """SelectorKey(fileobj, fd, events, data)
48
49    Object used to associate a file object to its backing
50    file descriptor, selected event mask, and attached data.
51"""
52if sys.version_info >= (3, 5):
53    SelectorKey.fileobj.__doc__ = 'File object registered.'
54    SelectorKey.fd.__doc__ = 'Underlying file descriptor.'
55    SelectorKey.events.__doc__ = 'Events that must be waited for on this file object.'
56    SelectorKey.data.__doc__ = ('''Optional opaque data associated to this file object.
57    For example, this could be used to store a per-client session ID.''')
58
59class _SelectorMapping(Mapping):
60    """Mapping of file objects to selector keys."""
61
62    def __init__(self, selector):
63        self._selector = selector
64
65    def __len__(self):
66        return len(self._selector._fd_to_key)
67
68    def __getitem__(self, fileobj):
69        try:
70            fd = self._selector._fileobj_lookup(fileobj)
71            return self._selector._fd_to_key[fd]
72        except KeyError:
73            raise KeyError("{!r} is not registered".format(fileobj)) from None
74
75    def __iter__(self):
76        return iter(self._selector._fd_to_key)
77
78
79class BaseSelector(metaclass=ABCMeta):
80    """Selector abstract base class.
81
82    A selector supports registering file objects to be monitored for specific
83    I/O events.
84
85    A file object is a file descriptor or any object with a `fileno()` method.
86    An arbitrary object can be attached to the file object, which can be used
87    for example to store context information, a callback, etc.
88
89    A selector can use various implementations (select(), poll(), epoll()...)
90    depending on the platform. The default `Selector` class uses the most
91    efficient implementation on the current platform.
92    """
93
94    @abstractmethod
95    def register(self, fileobj, events, data=None):
96        """Register a file object.
97
98        Parameters:
99        fileobj -- file object or file descriptor
100        events  -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
101        data    -- attached data
102
103        Returns:
104        SelectorKey instance
105
106        Raises:
107        ValueError if events is invalid
108        KeyError if fileobj is already registered
109        OSError if fileobj is closed or otherwise is unacceptable to
110                the underlying system call (if a system call is made)
111
112        Note:
113        OSError may or may not be raised
114        """
115        raise NotImplementedError
116
117    @abstractmethod
118    def unregister(self, fileobj):
119        """Unregister a file object.
120
121        Parameters:
122        fileobj -- file object or file descriptor
123
124        Returns:
125        SelectorKey instance
126
127        Raises:
128        KeyError if fileobj is not registered
129
130        Note:
131        If fileobj is registered but has since been closed this does
132        *not* raise OSError (even if the wrapped syscall does)
133        """
134        raise NotImplementedError
135
136    def modify(self, fileobj, events, data=None):
137        """Change a registered file object monitored events or attached data.
138
139        Parameters:
140        fileobj -- file object or file descriptor
141        events  -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
142        data    -- attached data
143
144        Returns:
145        SelectorKey instance
146
147        Raises:
148        Anything that unregister() or register() raises
149        """
150        self.unregister(fileobj)
151        return self.register(fileobj, events, data)
152
153    @abstractmethod
154    def select(self, timeout=None):
155        """Perform the actual selection, until some monitored file objects are
156        ready or a timeout expires.
157
158        Parameters:
159        timeout -- if timeout > 0, this specifies the maximum wait time, in
160                   seconds
161                   if timeout <= 0, the select() call won't block, and will
162                   report the currently ready file objects
163                   if timeout is None, select() will block until a monitored
164                   file object becomes ready
165
166        Returns:
167        list of (key, events) for ready file objects
168        `events` is a bitwise mask of EVENT_READ|EVENT_WRITE
169        """
170        raise NotImplementedError
171
172    def close(self):
173        """Close the selector.
174
175        This must be called to make sure that any underlying resource is freed.
176        """
177        pass
178
179    def get_key(self, fileobj):
180        """Return the key associated to a registered file object.
181
182        Returns:
183        SelectorKey for this file object
184        """
185        mapping = self.get_map()
186        if mapping is None:
187            raise RuntimeError('Selector is closed')
188        try:
189            return mapping[fileobj]
190        except KeyError:
191            raise KeyError("{!r} is not registered".format(fileobj)) from None
192
193    @abstractmethod
194    def get_map(self):
195        """Return a mapping of file objects to selector keys."""
196        raise NotImplementedError
197
198    def __enter__(self):
199        return self
200
201    def __exit__(self, *args):
202        self.close()
203
204
205class _BaseSelectorImpl(BaseSelector):
206    """Base selector implementation."""
207
208    def __init__(self):
209        # this maps file descriptors to keys
210        self._fd_to_key = {}
211        # read-only mapping returned by get_map()
212        self._map = _SelectorMapping(self)
213
214    def _fileobj_lookup(self, fileobj):
215        """Return a file descriptor from a file object.
216
217        This wraps _fileobj_to_fd() to do an exhaustive search in case
218        the object is invalid but we still have it in our map.  This
219        is used by unregister() so we can unregister an object that
220        was previously registered even if it is closed.  It is also
221        used by _SelectorMapping.
222        """
223        try:
224            return _fileobj_to_fd(fileobj)
225        except ValueError:
226            # Do an exhaustive search.
227            for key in self._fd_to_key.values():
228                if key.fileobj is fileobj:
229                    return key.fd
230            # Raise ValueError after all.
231            raise
232
233    def register(self, fileobj, events, data=None):
234        if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
235            raise ValueError("Invalid events: {!r}".format(events))
236
237        key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
238
239        if key.fd in self._fd_to_key:
240            raise KeyError("{!r} (FD {}) is already registered"
241                           .format(fileobj, key.fd))
242
243        self._fd_to_key[key.fd] = key
244        return key
245
246    def unregister(self, fileobj):
247        try:
248            key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
249        except KeyError:
250            raise KeyError("{!r} is not registered".format(fileobj)) from None
251        return key
252
253    def modify(self, fileobj, events, data=None):
254        # TODO: Subclasses can probably optimize this even further.
255        try:
256            key = self._fd_to_key[self._fileobj_lookup(fileobj)]
257        except KeyError:
258            raise KeyError("{!r} is not registered".format(fileobj)) from None
259        if events != key.events:
260            self.unregister(fileobj)
261            key = self.register(fileobj, events, data)
262        elif data != key.data:
263            # Use a shortcut to update the data.
264            key = key._replace(data=data)
265            self._fd_to_key[key.fd] = key
266        return key
267
268    def close(self):
269        self._fd_to_key.clear()
270        self._map = None
271
272    def get_map(self):
273        return self._map
274
275    def _key_from_fd(self, fd):
276        """Return the key associated to a given file descriptor.
277
278        Parameters:
279        fd -- file descriptor
280
281        Returns:
282        corresponding key, or None if not found
283        """
284        try:
285            return self._fd_to_key[fd]
286        except KeyError:
287            return None
288
289
290class SelectSelector(_BaseSelectorImpl):
291    """Select-based selector."""
292
293    def __init__(self):
294        super().__init__()
295        self._readers = set()
296        self._writers = set()
297
298    def register(self, fileobj, events, data=None):
299        key = super().register(fileobj, events, data)
300        if events & EVENT_READ:
301            self._readers.add(key.fd)
302        if events & EVENT_WRITE:
303            self._writers.add(key.fd)
304        return key
305
306    def unregister(self, fileobj):
307        key = super().unregister(fileobj)
308        self._readers.discard(key.fd)
309        self._writers.discard(key.fd)
310        return key
311
312    if sys.platform == 'win32':
313        def _select(self, r, w, _, timeout=None):
314            r, w, x = select.select(r, w, w, timeout)
315            return r, w + x, []
316    else:
317        _select = select.select
318
319    def select(self, timeout=None):
320        timeout = None if timeout is None else max(timeout, 0)
321        ready = []
322        try:
323            r, w, _ = self._select(self._readers, self._writers, [], timeout)
324        except InterruptedError:
325            return ready
326        r = set(r)
327        w = set(w)
328        for fd in r | w:
329            events = 0
330            if fd in r:
331                events |= EVENT_READ
332            if fd in w:
333                events |= EVENT_WRITE
334
335            key = self._key_from_fd(fd)
336            if key:
337                ready.append((key, events & key.events))
338        return ready
339
340
341if hasattr(select, 'poll'):
342
343    class PollSelector(_BaseSelectorImpl):
344        """Poll-based selector."""
345
346        def __init__(self):
347            super().__init__()
348            self._poll = select.poll()
349
350        def register(self, fileobj, events, data=None):
351            key = super().register(fileobj, events, data)
352            poll_events = 0
353            if events & EVENT_READ:
354                poll_events |= select.POLLIN
355            if events & EVENT_WRITE:
356                poll_events |= select.POLLOUT
357            self._poll.register(key.fd, poll_events)
358            return key
359
360        def unregister(self, fileobj):
361            key = super().unregister(fileobj)
362            self._poll.unregister(key.fd)
363            return key
364
365        def select(self, timeout=None):
366            if timeout is None:
367                timeout = None
368            elif timeout <= 0:
369                timeout = 0
370            else:
371                # poll() has a resolution of 1 millisecond, round away from
372                # zero to wait *at least* timeout seconds.
373                timeout = math.ceil(timeout * 1e3)
374            ready = []
375            try:
376                fd_event_list = self._poll.poll(timeout)
377            except InterruptedError:
378                return ready
379            for fd, event in fd_event_list:
380                events = 0
381                if event & ~select.POLLIN:
382                    events |= EVENT_WRITE
383                if event & ~select.POLLOUT:
384                    events |= EVENT_READ
385
386                key = self._key_from_fd(fd)
387                if key:
388                    ready.append((key, events & key.events))
389            return ready
390
391
392if hasattr(select, 'epoll'):
393
394    class EpollSelector(_BaseSelectorImpl):
395        """Epoll-based selector."""
396
397        def __init__(self):
398            super().__init__()
399            self._epoll = select.epoll()
400
401        def fileno(self):
402            return self._epoll.fileno()
403
404        def register(self, fileobj, events, data=None):
405            key = super().register(fileobj, events, data)
406            epoll_events = 0
407            if events & EVENT_READ:
408                epoll_events |= select.EPOLLIN
409            if events & EVENT_WRITE:
410                epoll_events |= select.EPOLLOUT
411            try:
412                self._epoll.register(key.fd, epoll_events)
413            except BaseException:
414                super().unregister(fileobj)
415                raise
416            return key
417
418        def unregister(self, fileobj):
419            key = super().unregister(fileobj)
420            try:
421                self._epoll.unregister(key.fd)
422            except OSError:
423                # This can happen if the FD was closed since it
424                # was registered.
425                pass
426            return key
427
428        def select(self, timeout=None):
429            if timeout is None:
430                timeout = -1
431            elif timeout <= 0:
432                timeout = 0
433            else:
434                # epoll_wait() has a resolution of 1 millisecond, round away
435                # from zero to wait *at least* timeout seconds.
436                timeout = math.ceil(timeout * 1e3) * 1e-3
437
438            # epoll_wait() expects `maxevents` to be greater than zero;
439            # we want to make sure that `select()` can be called when no
440            # FD is registered.
441            max_ev = max(len(self._fd_to_key), 1)
442
443            ready = []
444            try:
445                fd_event_list = self._epoll.poll(timeout, max_ev)
446            except InterruptedError:
447                return ready
448            for fd, event in fd_event_list:
449                events = 0
450                if event & ~select.EPOLLIN:
451                    events |= EVENT_WRITE
452                if event & ~select.EPOLLOUT:
453                    events |= EVENT_READ
454
455                key = self._key_from_fd(fd)
456                if key:
457                    ready.append((key, events & key.events))
458            return ready
459
460        def close(self):
461            self._epoll.close()
462            super().close()
463
464
465if hasattr(select, 'devpoll'):
466
467    class DevpollSelector(_BaseSelectorImpl):
468        """Solaris /dev/poll selector."""
469
470        def __init__(self):
471            super().__init__()
472            self._devpoll = select.devpoll()
473
474        def fileno(self):
475            return self._devpoll.fileno()
476
477        def register(self, fileobj, events, data=None):
478            key = super().register(fileobj, events, data)
479            poll_events = 0
480            if events & EVENT_READ:
481                poll_events |= select.POLLIN
482            if events & EVENT_WRITE:
483                poll_events |= select.POLLOUT
484            self._devpoll.register(key.fd, poll_events)
485            return key
486
487        def unregister(self, fileobj):
488            key = super().unregister(fileobj)
489            self._devpoll.unregister(key.fd)
490            return key
491
492        def select(self, timeout=None):
493            if timeout is None:
494                timeout = None
495            elif timeout <= 0:
496                timeout = 0
497            else:
498                # devpoll() has a resolution of 1 millisecond, round away from
499                # zero to wait *at least* timeout seconds.
500                timeout = math.ceil(timeout * 1e3)
501            ready = []
502            try:
503                fd_event_list = self._devpoll.poll(timeout)
504            except InterruptedError:
505                return ready
506            for fd, event in fd_event_list:
507                events = 0
508                if event & ~select.POLLIN:
509                    events |= EVENT_WRITE
510                if event & ~select.POLLOUT:
511                    events |= EVENT_READ
512
513                key = self._key_from_fd(fd)
514                if key:
515                    ready.append((key, events & key.events))
516            return ready
517
518        def close(self):
519            self._devpoll.close()
520            super().close()
521
522
523if hasattr(select, 'kqueue'):
524
525    class KqueueSelector(_BaseSelectorImpl):
526        """Kqueue-based selector."""
527
528        def __init__(self):
529            super().__init__()
530            self._kqueue = select.kqueue()
531
532        def fileno(self):
533            return self._kqueue.fileno()
534
535        def register(self, fileobj, events, data=None):
536            key = super().register(fileobj, events, data)
537            try:
538                if events & EVENT_READ:
539                    kev = select.kevent(key.fd, select.KQ_FILTER_READ,
540                                        select.KQ_EV_ADD)
541                    self._kqueue.control([kev], 0, 0)
542                if events & EVENT_WRITE:
543                    kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
544                                        select.KQ_EV_ADD)
545                    self._kqueue.control([kev], 0, 0)
546            except BaseException:
547                super().unregister(fileobj)
548                raise
549            return key
550
551        def unregister(self, fileobj):
552            key = super().unregister(fileobj)
553            if key.events & EVENT_READ:
554                kev = select.kevent(key.fd, select.KQ_FILTER_READ,
555                                    select.KQ_EV_DELETE)
556                try:
557                    self._kqueue.control([kev], 0, 0)
558                except OSError:
559                    # This can happen if the FD was closed since it
560                    # was registered.
561                    pass
562            if key.events & EVENT_WRITE:
563                kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
564                                    select.KQ_EV_DELETE)
565                try:
566                    self._kqueue.control([kev], 0, 0)
567                except OSError:
568                    # See comment above.
569                    pass
570            return key
571
572        def select(self, timeout=None):
573            timeout = None if timeout is None else max(timeout, 0)
574            max_ev = len(self._fd_to_key)
575            ready = []
576            try:
577                kev_list = self._kqueue.control(None, max_ev, timeout)
578            except InterruptedError:
579                return ready
580            for kev in kev_list:
581                fd = kev.ident
582                flag = kev.filter
583                events = 0
584                if flag == select.KQ_FILTER_READ:
585                    events |= EVENT_READ
586                if flag == select.KQ_FILTER_WRITE:
587                    events |= EVENT_WRITE
588
589                key = self._key_from_fd(fd)
590                if key:
591                    ready.append((key, events & key.events))
592            return ready
593
594        def close(self):
595            self._kqueue.close()
596            super().close()
597
598
599# Choose the best implementation, roughly:
600#    epoll|kqueue|devpoll > poll > select.
601# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
602if 'KqueueSelector' in globals():
603    DefaultSelector = KqueueSelector
604elif 'EpollSelector' in globals():
605    DefaultSelector = EpollSelector
606elif 'DevpollSelector' in globals():
607    DefaultSelector = DevpollSelector
608elif 'PollSelector' in globals():
609    DefaultSelector = PollSelector
610else:
611    DefaultSelector = SelectSelector
612