1"""Selectors module. 2 3This module allows high-level and efficient I/O multiplexing, built upon the 4`select` module primitives. 5""" 6 7 8from abc import ABCMeta, abstractmethod 9from collections import namedtuple 10from collections.abc import Mapping 11import math 12import select 13import sys 14 15 16# generic events, that must be mapped to implementation-specific ones 17EVENT_READ = (1 << 0) 18EVENT_WRITE = (1 << 1) 19 20 21def _fileobj_to_fd(fileobj): 22 """Return a file descriptor from a file object. 23 24 Parameters: 25 fileobj -- file object or file descriptor 26 27 Returns: 28 corresponding file descriptor 29 30 Raises: 31 ValueError if the object is invalid 32 """ 33 if isinstance(fileobj, int): 34 fd = fileobj 35 else: 36 try: 37 fd = int(fileobj.fileno()) 38 except (AttributeError, TypeError, ValueError): 39 raise ValueError("Invalid file object: " 40 "{!r}".format(fileobj)) from None 41 if fd < 0: 42 raise ValueError("Invalid file descriptor: {}".format(fd)) 43 return fd 44 45 46SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) 47 48SelectorKey.__doc__ = """SelectorKey(fileobj, fd, events, data) 49 50 Object used to associate a file object to its backing 51 file descriptor, selected event mask, and attached data. 52""" 53if sys.version_info >= (3, 5): 54 SelectorKey.fileobj.__doc__ = 'File object registered.' 55 SelectorKey.fd.__doc__ = 'Underlying file descriptor.' 56 SelectorKey.events.__doc__ = 'Events that must be waited for on this file object.' 57 SelectorKey.data.__doc__ = ('''Optional opaque data associated to this file object. 58 For example, this could be used to store a per-client session ID.''') 59 60 61class _SelectorMapping(Mapping): 62 """Mapping of file objects to selector keys.""" 63 64 def __init__(self, selector): 65 self._selector = selector 66 67 def __len__(self): 68 return len(self._selector._fd_to_key) 69 70 def __getitem__(self, fileobj): 71 try: 72 fd = self._selector._fileobj_lookup(fileobj) 73 return self._selector._fd_to_key[fd] 74 except KeyError: 75 raise KeyError("{!r} is not registered".format(fileobj)) from None 76 77 def __iter__(self): 78 return iter(self._selector._fd_to_key) 79 80 81class BaseSelector(metaclass=ABCMeta): 82 """Selector abstract base class. 83 84 A selector supports registering file objects to be monitored for specific 85 I/O events. 86 87 A file object is a file descriptor or any object with a `fileno()` method. 88 An arbitrary object can be attached to the file object, which can be used 89 for example to store context information, a callback, etc. 90 91 A selector can use various implementations (select(), poll(), epoll()...) 92 depending on the platform. The default `Selector` class uses the most 93 efficient implementation on the current platform. 94 """ 95 96 @abstractmethod 97 def register(self, fileobj, events, data=None): 98 """Register a file object. 99 100 Parameters: 101 fileobj -- file object or file descriptor 102 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) 103 data -- attached data 104 105 Returns: 106 SelectorKey instance 107 108 Raises: 109 ValueError if events is invalid 110 KeyError if fileobj is already registered 111 OSError if fileobj is closed or otherwise is unacceptable to 112 the underlying system call (if a system call is made) 113 114 Note: 115 OSError may or may not be raised 116 """ 117 raise NotImplementedError 118 119 @abstractmethod 120 def unregister(self, fileobj): 121 """Unregister a file object. 122 123 Parameters: 124 fileobj -- file object or file descriptor 125 126 Returns: 127 SelectorKey instance 128 129 Raises: 130 KeyError if fileobj is not registered 131 132 Note: 133 If fileobj is registered but has since been closed this does 134 *not* raise OSError (even if the wrapped syscall does) 135 """ 136 raise NotImplementedError 137 138 def modify(self, fileobj, events, data=None): 139 """Change a registered file object monitored events or attached data. 140 141 Parameters: 142 fileobj -- file object or file descriptor 143 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) 144 data -- attached data 145 146 Returns: 147 SelectorKey instance 148 149 Raises: 150 Anything that unregister() or register() raises 151 """ 152 self.unregister(fileobj) 153 return self.register(fileobj, events, data) 154 155 @abstractmethod 156 def select(self, timeout=None): 157 """Perform the actual selection, until some monitored file objects are 158 ready or a timeout expires. 159 160 Parameters: 161 timeout -- if timeout > 0, this specifies the maximum wait time, in 162 seconds 163 if timeout <= 0, the select() call won't block, and will 164 report the currently ready file objects 165 if timeout is None, select() will block until a monitored 166 file object becomes ready 167 168 Returns: 169 list of (key, events) for ready file objects 170 `events` is a bitwise mask of EVENT_READ|EVENT_WRITE 171 """ 172 raise NotImplementedError 173 174 def close(self): 175 """Close the selector. 176 177 This must be called to make sure that any underlying resource is freed. 178 """ 179 pass 180 181 def get_key(self, fileobj): 182 """Return the key associated to a registered file object. 183 184 Returns: 185 SelectorKey for this file object 186 """ 187 mapping = self.get_map() 188 if mapping is None: 189 raise RuntimeError('Selector is closed') 190 try: 191 return mapping[fileobj] 192 except KeyError: 193 raise KeyError("{!r} is not registered".format(fileobj)) from None 194 195 @abstractmethod 196 def get_map(self): 197 """Return a mapping of file objects to selector keys.""" 198 raise NotImplementedError 199 200 def __enter__(self): 201 return self 202 203 def __exit__(self, *args): 204 self.close() 205 206 207class _BaseSelectorImpl(BaseSelector): 208 """Base selector implementation.""" 209 210 def __init__(self): 211 # this maps file descriptors to keys 212 self._fd_to_key = {} 213 # read-only mapping returned by get_map() 214 self._map = _SelectorMapping(self) 215 216 def _fileobj_lookup(self, fileobj): 217 """Return a file descriptor from a file object. 218 219 This wraps _fileobj_to_fd() to do an exhaustive search in case 220 the object is invalid but we still have it in our map. This 221 is used by unregister() so we can unregister an object that 222 was previously registered even if it is closed. It is also 223 used by _SelectorMapping. 224 """ 225 try: 226 return _fileobj_to_fd(fileobj) 227 except ValueError: 228 # Do an exhaustive search. 229 for key in self._fd_to_key.values(): 230 if key.fileobj is fileobj: 231 return key.fd 232 # Raise ValueError after all. 233 raise 234 235 def register(self, fileobj, events, data=None): 236 if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): 237 raise ValueError("Invalid events: {!r}".format(events)) 238 239 key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data) 240 241 if key.fd in self._fd_to_key: 242 raise KeyError("{!r} (FD {}) is already registered" 243 .format(fileobj, key.fd)) 244 245 self._fd_to_key[key.fd] = key 246 return key 247 248 def unregister(self, fileobj): 249 try: 250 key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) 251 except KeyError: 252 raise KeyError("{!r} is not registered".format(fileobj)) from None 253 return key 254 255 def modify(self, fileobj, events, data=None): 256 try: 257 key = self._fd_to_key[self._fileobj_lookup(fileobj)] 258 except KeyError: 259 raise KeyError("{!r} is not registered".format(fileobj)) from None 260 if events != key.events: 261 self.unregister(fileobj) 262 key = self.register(fileobj, events, data) 263 elif data != key.data: 264 # Use a shortcut to update the data. 265 key = key._replace(data=data) 266 self._fd_to_key[key.fd] = key 267 return key 268 269 def close(self): 270 self._fd_to_key.clear() 271 self._map = None 272 273 def get_map(self): 274 return self._map 275 276 def _key_from_fd(self, fd): 277 """Return the key associated to a given file descriptor. 278 279 Parameters: 280 fd -- file descriptor 281 282 Returns: 283 corresponding key, or None if not found 284 """ 285 try: 286 return self._fd_to_key[fd] 287 except KeyError: 288 return None 289 290 291class SelectSelector(_BaseSelectorImpl): 292 """Select-based selector.""" 293 294 def __init__(self): 295 super().__init__() 296 self._readers = set() 297 self._writers = set() 298 299 def register(self, fileobj, events, data=None): 300 key = super().register(fileobj, events, data) 301 if events & EVENT_READ: 302 self._readers.add(key.fd) 303 if events & EVENT_WRITE: 304 self._writers.add(key.fd) 305 return key 306 307 def unregister(self, fileobj): 308 key = super().unregister(fileobj) 309 self._readers.discard(key.fd) 310 self._writers.discard(key.fd) 311 return key 312 313 if sys.platform == 'win32': 314 def _select(self, r, w, _, timeout=None): 315 r, w, x = select.select(r, w, w, timeout) 316 return r, w + x, [] 317 else: 318 _select = select.select 319 320 def select(self, timeout=None): 321 timeout = None if timeout is None else max(timeout, 0) 322 ready = [] 323 try: 324 r, w, _ = self._select(self._readers, self._writers, [], timeout) 325 except InterruptedError: 326 return ready 327 r = set(r) 328 w = set(w) 329 for fd in r | w: 330 events = 0 331 if fd in r: 332 events |= EVENT_READ 333 if fd in w: 334 events |= EVENT_WRITE 335 336 key = self._key_from_fd(fd) 337 if key: 338 ready.append((key, events & key.events)) 339 return ready 340 341 342class _PollLikeSelector(_BaseSelectorImpl): 343 """Base class shared between poll, epoll and devpoll selectors.""" 344 _selector_cls = None 345 _EVENT_READ = None 346 _EVENT_WRITE = None 347 348 def __init__(self): 349 super().__init__() 350 self._selector = self._selector_cls() 351 352 def register(self, fileobj, events, data=None): 353 key = super().register(fileobj, events, data) 354 poller_events = 0 355 if events & EVENT_READ: 356 poller_events |= self._EVENT_READ 357 if events & EVENT_WRITE: 358 poller_events |= self._EVENT_WRITE 359 try: 360 self._selector.register(key.fd, poller_events) 361 except: 362 super().unregister(fileobj) 363 raise 364 return key 365 366 def unregister(self, fileobj): 367 key = super().unregister(fileobj) 368 try: 369 self._selector.unregister(key.fd) 370 except OSError: 371 # This can happen if the FD was closed since it 372 # was registered. 373 pass 374 return key 375 376 def modify(self, fileobj, events, data=None): 377 try: 378 key = self._fd_to_key[self._fileobj_lookup(fileobj)] 379 except KeyError: 380 raise KeyError(f"{fileobj!r} is not registered") from None 381 382 changed = False 383 if events != key.events: 384 selector_events = 0 385 if events & EVENT_READ: 386 selector_events |= self._EVENT_READ 387 if events & EVENT_WRITE: 388 selector_events |= self._EVENT_WRITE 389 try: 390 self._selector.modify(key.fd, selector_events) 391 except: 392 super().unregister(fileobj) 393 raise 394 changed = True 395 if data != key.data: 396 changed = True 397 398 if changed: 399 key = key._replace(events=events, data=data) 400 self._fd_to_key[key.fd] = key 401 return key 402 403 def select(self, timeout=None): 404 # This is shared between poll() and epoll(). 405 # epoll() has a different signature and handling of timeout parameter. 406 if timeout is None: 407 timeout = None 408 elif timeout <= 0: 409 timeout = 0 410 else: 411 # poll() has a resolution of 1 millisecond, round away from 412 # zero to wait *at least* timeout seconds. 413 timeout = math.ceil(timeout * 1e3) 414 ready = [] 415 try: 416 fd_event_list = self._selector.poll(timeout) 417 except InterruptedError: 418 return ready 419 for fd, event in fd_event_list: 420 events = 0 421 if event & ~self._EVENT_READ: 422 events |= EVENT_WRITE 423 if event & ~self._EVENT_WRITE: 424 events |= EVENT_READ 425 426 key = self._key_from_fd(fd) 427 if key: 428 ready.append((key, events & key.events)) 429 return ready 430 431 432if hasattr(select, 'poll'): 433 434 class PollSelector(_PollLikeSelector): 435 """Poll-based selector.""" 436 _selector_cls = select.poll 437 _EVENT_READ = select.POLLIN 438 _EVENT_WRITE = select.POLLOUT 439 440 441if hasattr(select, 'epoll'): 442 443 class EpollSelector(_PollLikeSelector): 444 """Epoll-based selector.""" 445 _selector_cls = select.epoll 446 _EVENT_READ = select.EPOLLIN 447 _EVENT_WRITE = select.EPOLLOUT 448 449 def fileno(self): 450 return self._selector.fileno() 451 452 def select(self, timeout=None): 453 if timeout is None: 454 timeout = -1 455 elif timeout <= 0: 456 timeout = 0 457 else: 458 # epoll_wait() has a resolution of 1 millisecond, round away 459 # from zero to wait *at least* timeout seconds. 460 timeout = math.ceil(timeout * 1e3) * 1e-3 461 462 # epoll_wait() expects `maxevents` to be greater than zero; 463 # we want to make sure that `select()` can be called when no 464 # FD is registered. 465 max_ev = max(len(self._fd_to_key), 1) 466 467 ready = [] 468 try: 469 fd_event_list = self._selector.poll(timeout, max_ev) 470 except InterruptedError: 471 return ready 472 for fd, event in fd_event_list: 473 events = 0 474 if event & ~select.EPOLLIN: 475 events |= EVENT_WRITE 476 if event & ~select.EPOLLOUT: 477 events |= EVENT_READ 478 479 key = self._key_from_fd(fd) 480 if key: 481 ready.append((key, events & key.events)) 482 return ready 483 484 def close(self): 485 self._selector.close() 486 super().close() 487 488 489if hasattr(select, 'devpoll'): 490 491 class DevpollSelector(_PollLikeSelector): 492 """Solaris /dev/poll selector.""" 493 _selector_cls = select.devpoll 494 _EVENT_READ = select.POLLIN 495 _EVENT_WRITE = select.POLLOUT 496 497 def fileno(self): 498 return self._selector.fileno() 499 500 def close(self): 501 self._selector.close() 502 super().close() 503 504 505if hasattr(select, 'kqueue'): 506 507 class KqueueSelector(_BaseSelectorImpl): 508 """Kqueue-based selector.""" 509 510 def __init__(self): 511 super().__init__() 512 self._selector = select.kqueue() 513 514 def fileno(self): 515 return self._selector.fileno() 516 517 def register(self, fileobj, events, data=None): 518 key = super().register(fileobj, events, data) 519 try: 520 if events & EVENT_READ: 521 kev = select.kevent(key.fd, select.KQ_FILTER_READ, 522 select.KQ_EV_ADD) 523 self._selector.control([kev], 0, 0) 524 if events & EVENT_WRITE: 525 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, 526 select.KQ_EV_ADD) 527 self._selector.control([kev], 0, 0) 528 except: 529 super().unregister(fileobj) 530 raise 531 return key 532 533 def unregister(self, fileobj): 534 key = super().unregister(fileobj) 535 if key.events & EVENT_READ: 536 kev = select.kevent(key.fd, select.KQ_FILTER_READ, 537 select.KQ_EV_DELETE) 538 try: 539 self._selector.control([kev], 0, 0) 540 except OSError: 541 # This can happen if the FD was closed since it 542 # was registered. 543 pass 544 if key.events & EVENT_WRITE: 545 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, 546 select.KQ_EV_DELETE) 547 try: 548 self._selector.control([kev], 0, 0) 549 except OSError: 550 # See comment above. 551 pass 552 return key 553 554 def select(self, timeout=None): 555 timeout = None if timeout is None else max(timeout, 0) 556 # If max_ev is 0, kqueue will ignore the timeout. For consistent 557 # behavior with the other selector classes, we prevent that here 558 # (using max). See https://bugs.python.org/issue29255 559 max_ev = max(len(self._fd_to_key), 1) 560 ready = [] 561 try: 562 kev_list = self._selector.control(None, max_ev, timeout) 563 except InterruptedError: 564 return ready 565 for kev in kev_list: 566 fd = kev.ident 567 flag = kev.filter 568 events = 0 569 if flag == select.KQ_FILTER_READ: 570 events |= EVENT_READ 571 if flag == select.KQ_FILTER_WRITE: 572 events |= EVENT_WRITE 573 574 key = self._key_from_fd(fd) 575 if key: 576 ready.append((key, events & key.events)) 577 return ready 578 579 def close(self): 580 self._selector.close() 581 super().close() 582 583 584def _can_use(method): 585 """Check if we can use the selector depending upon the 586 operating system. """ 587 # Implementation based upon https://github.com/sethmlarson/selectors2/blob/master/selectors2.py 588 selector = getattr(select, method, None) 589 if selector is None: 590 # select module does not implement method 591 return False 592 # check if the OS and Kernel actually support the method. Call may fail with 593 # OSError: [Errno 38] Function not implemented 594 try: 595 selector_obj = selector() 596 if method == 'poll': 597 # check that poll actually works 598 selector_obj.poll(0) 599 else: 600 # close epoll, kqueue, and devpoll fd 601 selector_obj.close() 602 return True 603 except OSError: 604 return False 605 606 607# Choose the best implementation, roughly: 608# epoll|kqueue|devpoll > poll > select. 609# select() also can't accept a FD > FD_SETSIZE (usually around 1024) 610if _can_use('kqueue'): 611 DefaultSelector = KqueueSelector 612elif _can_use('epoll'): 613 DefaultSelector = EpollSelector 614elif _can_use('devpoll'): 615 DefaultSelector = DevpollSelector 616elif _can_use('poll'): 617 DefaultSelector = PollSelector 618else: 619 DefaultSelector = SelectSelector 620