1"""Selectors module. 2 3This module allows high-level and efficient I/O multiplexing, built upon the 4`select` module primitives. 5""" 6 7 8from abc import ABCMeta, abstractmethod 9from collections import namedtuple 10from collections.abc import Mapping 11import math 12import select 13import sys 14 15 16# generic events, that must be mapped to implementation-specific ones 17EVENT_READ = (1 << 0) 18EVENT_WRITE = (1 << 1) 19 20 21def _fileobj_to_fd(fileobj): 22 """Return a file descriptor from a file object. 23 24 Parameters: 25 fileobj -- file object or file descriptor 26 27 Returns: 28 corresponding file descriptor 29 30 Raises: 31 ValueError if the object is invalid 32 """ 33 if isinstance(fileobj, int): 34 fd = fileobj 35 else: 36 try: 37 fd = int(fileobj.fileno()) 38 except (AttributeError, TypeError, ValueError): 39 raise ValueError("Invalid file object: " 40 "{!r}".format(fileobj)) from None 41 if fd < 0: 42 raise ValueError("Invalid file descriptor: {}".format(fd)) 43 return fd 44 45 46SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) 47 48SelectorKey.__doc__ = """SelectorKey(fileobj, fd, events, data) 49 50 Object used to associate a file object to its backing 51 file descriptor, selected event mask, and attached data. 52""" 53SelectorKey.fileobj.__doc__ = 'File object registered.' 54SelectorKey.fd.__doc__ = 'Underlying file descriptor.' 55SelectorKey.events.__doc__ = 'Events that must be waited for on this file object.' 56SelectorKey.data.__doc__ = ('''Optional opaque data associated to this file object. 57For example, this could be used to store a per-client session ID.''') 58 59 60class _SelectorMapping(Mapping): 61 """Mapping of file objects to selector keys.""" 62 63 def __init__(self, selector): 64 self._selector = selector 65 66 def __len__(self): 67 return len(self._selector._fd_to_key) 68 69 def get(self, fileobj, default=None): 70 fd = self._selector._fileobj_lookup(fileobj) 71 return self._selector._fd_to_key.get(fd, default) 72 73 def __getitem__(self, fileobj): 74 fd = self._selector._fileobj_lookup(fileobj) 75 key = self._selector._fd_to_key.get(fd) 76 if key is None: 77 raise KeyError("{!r} is not registered".format(fileobj)) 78 return key 79 80 def __iter__(self): 81 return iter(self._selector._fd_to_key) 82 83 84class BaseSelector(metaclass=ABCMeta): 85 """Selector abstract base class. 86 87 A selector supports registering file objects to be monitored for specific 88 I/O events. 89 90 A file object is a file descriptor or any object with a `fileno()` method. 91 An arbitrary object can be attached to the file object, which can be used 92 for example to store context information, a callback, etc. 93 94 A selector can use various implementations (select(), poll(), epoll()...) 95 depending on the platform. The default `Selector` class uses the most 96 efficient implementation on the current platform. 97 """ 98 99 @abstractmethod 100 def register(self, fileobj, events, data=None): 101 """Register a file object. 102 103 Parameters: 104 fileobj -- file object or file descriptor 105 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) 106 data -- attached data 107 108 Returns: 109 SelectorKey instance 110 111 Raises: 112 ValueError if events is invalid 113 KeyError if fileobj is already registered 114 OSError if fileobj is closed or otherwise is unacceptable to 115 the underlying system call (if a system call is made) 116 117 Note: 118 OSError may or may not be raised 119 """ 120 raise NotImplementedError 121 122 @abstractmethod 123 def unregister(self, fileobj): 124 """Unregister a file object. 125 126 Parameters: 127 fileobj -- file object or file descriptor 128 129 Returns: 130 SelectorKey instance 131 132 Raises: 133 KeyError if fileobj is not registered 134 135 Note: 136 If fileobj is registered but has since been closed this does 137 *not* raise OSError (even if the wrapped syscall does) 138 """ 139 raise NotImplementedError 140 141 def modify(self, fileobj, events, data=None): 142 """Change a registered file object monitored events or attached data. 143 144 Parameters: 145 fileobj -- file object or file descriptor 146 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) 147 data -- attached data 148 149 Returns: 150 SelectorKey instance 151 152 Raises: 153 Anything that unregister() or register() raises 154 """ 155 self.unregister(fileobj) 156 return self.register(fileobj, events, data) 157 158 @abstractmethod 159 def select(self, timeout=None): 160 """Perform the actual selection, until some monitored file objects are 161 ready or a timeout expires. 162 163 Parameters: 164 timeout -- if timeout > 0, this specifies the maximum wait time, in 165 seconds 166 if timeout <= 0, the select() call won't block, and will 167 report the currently ready file objects 168 if timeout is None, select() will block until a monitored 169 file object becomes ready 170 171 Returns: 172 list of (key, events) for ready file objects 173 `events` is a bitwise mask of EVENT_READ|EVENT_WRITE 174 """ 175 raise NotImplementedError 176 177 def close(self): 178 """Close the selector. 179 180 This must be called to make sure that any underlying resource is freed. 181 """ 182 pass 183 184 def get_key(self, fileobj): 185 """Return the key associated to a registered file object. 186 187 Returns: 188 SelectorKey for this file object 189 """ 190 mapping = self.get_map() 191 if mapping is None: 192 raise RuntimeError('Selector is closed') 193 try: 194 return mapping[fileobj] 195 except KeyError: 196 raise KeyError("{!r} is not registered".format(fileobj)) from None 197 198 @abstractmethod 199 def get_map(self): 200 """Return a mapping of file objects to selector keys.""" 201 raise NotImplementedError 202 203 def __enter__(self): 204 return self 205 206 def __exit__(self, *args): 207 self.close() 208 209 210class _BaseSelectorImpl(BaseSelector): 211 """Base selector implementation.""" 212 213 def __init__(self): 214 # this maps file descriptors to keys 215 self._fd_to_key = {} 216 # read-only mapping returned by get_map() 217 self._map = _SelectorMapping(self) 218 219 def _fileobj_lookup(self, fileobj): 220 """Return a file descriptor from a file object. 221 222 This wraps _fileobj_to_fd() to do an exhaustive search in case 223 the object is invalid but we still have it in our map. This 224 is used by unregister() so we can unregister an object that 225 was previously registered even if it is closed. It is also 226 used by _SelectorMapping. 227 """ 228 try: 229 return _fileobj_to_fd(fileobj) 230 except ValueError: 231 # Do an exhaustive search. 232 for key in self._fd_to_key.values(): 233 if key.fileobj is fileobj: 234 return key.fd 235 # Raise ValueError after all. 236 raise 237 238 def register(self, fileobj, events, data=None): 239 if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): 240 raise ValueError("Invalid events: {!r}".format(events)) 241 242 key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data) 243 244 if key.fd in self._fd_to_key: 245 raise KeyError("{!r} (FD {}) is already registered" 246 .format(fileobj, key.fd)) 247 248 self._fd_to_key[key.fd] = key 249 return key 250 251 def unregister(self, fileobj): 252 try: 253 key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) 254 except KeyError: 255 raise KeyError("{!r} is not registered".format(fileobj)) from None 256 return key 257 258 def modify(self, fileobj, events, data=None): 259 try: 260 key = self._fd_to_key[self._fileobj_lookup(fileobj)] 261 except KeyError: 262 raise KeyError("{!r} is not registered".format(fileobj)) from None 263 if events != key.events: 264 self.unregister(fileobj) 265 key = self.register(fileobj, events, data) 266 elif data != key.data: 267 # Use a shortcut to update the data. 268 key = key._replace(data=data) 269 self._fd_to_key[key.fd] = key 270 return key 271 272 def close(self): 273 self._fd_to_key.clear() 274 self._map = None 275 276 def get_map(self): 277 return self._map 278 279 280 281class SelectSelector(_BaseSelectorImpl): 282 """Select-based selector.""" 283 284 def __init__(self): 285 super().__init__() 286 self._readers = set() 287 self._writers = set() 288 289 def register(self, fileobj, events, data=None): 290 key = super().register(fileobj, events, data) 291 if events & EVENT_READ: 292 self._readers.add(key.fd) 293 if events & EVENT_WRITE: 294 self._writers.add(key.fd) 295 return key 296 297 def unregister(self, fileobj): 298 key = super().unregister(fileobj) 299 self._readers.discard(key.fd) 300 self._writers.discard(key.fd) 301 return key 302 303 if sys.platform == 'win32': 304 def _select(self, r, w, _, timeout=None): 305 r, w, x = select.select(r, w, w, timeout) 306 return r, w + x, [] 307 else: 308 _select = select.select 309 310 def select(self, timeout=None): 311 timeout = None if timeout is None else max(timeout, 0) 312 ready = [] 313 try: 314 r, w, _ = self._select(self._readers, self._writers, [], timeout) 315 except InterruptedError: 316 return ready 317 r = frozenset(r) 318 w = frozenset(w) 319 rw = r | w 320 fd_to_key_get = self._fd_to_key.get 321 for fd in rw: 322 key = fd_to_key_get(fd) 323 if key: 324 events = ((fd in r and EVENT_READ) 325 | (fd in w and EVENT_WRITE)) 326 ready.append((key, events & key.events)) 327 return ready 328 329 330class _PollLikeSelector(_BaseSelectorImpl): 331 """Base class shared between poll, epoll and devpoll selectors.""" 332 _selector_cls = None 333 _EVENT_READ = None 334 _EVENT_WRITE = None 335 336 def __init__(self): 337 super().__init__() 338 self._selector = self._selector_cls() 339 340 def register(self, fileobj, events, data=None): 341 key = super().register(fileobj, events, data) 342 poller_events = ((events & EVENT_READ and self._EVENT_READ) 343 | (events & EVENT_WRITE and self._EVENT_WRITE) ) 344 try: 345 self._selector.register(key.fd, poller_events) 346 except: 347 super().unregister(fileobj) 348 raise 349 return key 350 351 def unregister(self, fileobj): 352 key = super().unregister(fileobj) 353 try: 354 self._selector.unregister(key.fd) 355 except OSError: 356 # This can happen if the FD was closed since it 357 # was registered. 358 pass 359 return key 360 361 def modify(self, fileobj, events, data=None): 362 try: 363 key = self._fd_to_key[self._fileobj_lookup(fileobj)] 364 except KeyError: 365 raise KeyError(f"{fileobj!r} is not registered") from None 366 367 changed = False 368 if events != key.events: 369 selector_events = ((events & EVENT_READ and self._EVENT_READ) 370 | (events & EVENT_WRITE and self._EVENT_WRITE)) 371 try: 372 self._selector.modify(key.fd, selector_events) 373 except: 374 super().unregister(fileobj) 375 raise 376 changed = True 377 if data != key.data: 378 changed = True 379 380 if changed: 381 key = key._replace(events=events, data=data) 382 self._fd_to_key[key.fd] = key 383 return key 384 385 def select(self, timeout=None): 386 # This is shared between poll() and epoll(). 387 # epoll() has a different signature and handling of timeout parameter. 388 if timeout is None: 389 timeout = None 390 elif timeout <= 0: 391 timeout = 0 392 else: 393 # poll() has a resolution of 1 millisecond, round away from 394 # zero to wait *at least* timeout seconds. 395 timeout = math.ceil(timeout * 1e3) 396 ready = [] 397 try: 398 fd_event_list = self._selector.poll(timeout) 399 except InterruptedError: 400 return ready 401 402 fd_to_key_get = self._fd_to_key.get 403 for fd, event in fd_event_list: 404 key = fd_to_key_get(fd) 405 if key: 406 events = ((event & ~self._EVENT_READ and EVENT_WRITE) 407 | (event & ~self._EVENT_WRITE and EVENT_READ)) 408 ready.append((key, events & key.events)) 409 return ready 410 411 412if hasattr(select, 'poll'): 413 414 class PollSelector(_PollLikeSelector): 415 """Poll-based selector.""" 416 _selector_cls = select.poll 417 _EVENT_READ = select.POLLIN 418 _EVENT_WRITE = select.POLLOUT 419 420 421if hasattr(select, 'epoll'): 422 423 _NOT_EPOLLIN = ~select.EPOLLIN 424 _NOT_EPOLLOUT = ~select.EPOLLOUT 425 426 class EpollSelector(_PollLikeSelector): 427 """Epoll-based selector.""" 428 _selector_cls = select.epoll 429 _EVENT_READ = select.EPOLLIN 430 _EVENT_WRITE = select.EPOLLOUT 431 432 def fileno(self): 433 return self._selector.fileno() 434 435 def select(self, timeout=None): 436 if timeout is None: 437 timeout = -1 438 elif timeout <= 0: 439 timeout = 0 440 else: 441 # epoll_wait() has a resolution of 1 millisecond, round away 442 # from zero to wait *at least* timeout seconds. 443 timeout = math.ceil(timeout * 1e3) * 1e-3 444 445 # epoll_wait() expects `maxevents` to be greater than zero; 446 # we want to make sure that `select()` can be called when no 447 # FD is registered. 448 max_ev = len(self._fd_to_key) or 1 449 450 ready = [] 451 try: 452 fd_event_list = self._selector.poll(timeout, max_ev) 453 except InterruptedError: 454 return ready 455 456 fd_to_key = self._fd_to_key 457 for fd, event in fd_event_list: 458 key = fd_to_key.get(fd) 459 if key: 460 events = ((event & _NOT_EPOLLIN and EVENT_WRITE) 461 | (event & _NOT_EPOLLOUT and EVENT_READ)) 462 ready.append((key, events & key.events)) 463 return ready 464 465 def close(self): 466 self._selector.close() 467 super().close() 468 469 470if hasattr(select, 'devpoll'): 471 472 class DevpollSelector(_PollLikeSelector): 473 """Solaris /dev/poll selector.""" 474 _selector_cls = select.devpoll 475 _EVENT_READ = select.POLLIN 476 _EVENT_WRITE = select.POLLOUT 477 478 def fileno(self): 479 return self._selector.fileno() 480 481 def close(self): 482 self._selector.close() 483 super().close() 484 485 486if hasattr(select, 'kqueue'): 487 488 class KqueueSelector(_BaseSelectorImpl): 489 """Kqueue-based selector.""" 490 491 def __init__(self): 492 super().__init__() 493 self._selector = select.kqueue() 494 self._max_events = 0 495 496 def fileno(self): 497 return self._selector.fileno() 498 499 def register(self, fileobj, events, data=None): 500 key = super().register(fileobj, events, data) 501 try: 502 if events & EVENT_READ: 503 kev = select.kevent(key.fd, select.KQ_FILTER_READ, 504 select.KQ_EV_ADD) 505 self._selector.control([kev], 0, 0) 506 self._max_events += 1 507 if events & EVENT_WRITE: 508 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, 509 select.KQ_EV_ADD) 510 self._selector.control([kev], 0, 0) 511 self._max_events += 1 512 except: 513 super().unregister(fileobj) 514 raise 515 return key 516 517 def unregister(self, fileobj): 518 key = super().unregister(fileobj) 519 if key.events & EVENT_READ: 520 kev = select.kevent(key.fd, select.KQ_FILTER_READ, 521 select.KQ_EV_DELETE) 522 self._max_events -= 1 523 try: 524 self._selector.control([kev], 0, 0) 525 except OSError: 526 # This can happen if the FD was closed since it 527 # was registered. 528 pass 529 if key.events & EVENT_WRITE: 530 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, 531 select.KQ_EV_DELETE) 532 self._max_events -= 1 533 try: 534 self._selector.control([kev], 0, 0) 535 except OSError: 536 # See comment above. 537 pass 538 return key 539 540 def select(self, timeout=None): 541 timeout = None if timeout is None else max(timeout, 0) 542 # If max_ev is 0, kqueue will ignore the timeout. For consistent 543 # behavior with the other selector classes, we prevent that here 544 # (using max). See https://bugs.python.org/issue29255 545 max_ev = self._max_events or 1 546 ready = [] 547 try: 548 kev_list = self._selector.control(None, max_ev, timeout) 549 except InterruptedError: 550 return ready 551 552 fd_to_key_get = self._fd_to_key.get 553 for kev in kev_list: 554 fd = kev.ident 555 flag = kev.filter 556 key = fd_to_key_get(fd) 557 if key: 558 events = ((flag == select.KQ_FILTER_READ and EVENT_READ) 559 | (flag == select.KQ_FILTER_WRITE and EVENT_WRITE)) 560 ready.append((key, events & key.events)) 561 return ready 562 563 def close(self): 564 self._selector.close() 565 super().close() 566 567 568def _can_use(method): 569 """Check if we can use the selector depending upon the 570 operating system. """ 571 # Implementation based upon https://github.com/sethmlarson/selectors2/blob/master/selectors2.py 572 selector = getattr(select, method, None) 573 if selector is None: 574 # select module does not implement method 575 return False 576 # check if the OS and Kernel actually support the method. Call may fail with 577 # OSError: [Errno 38] Function not implemented 578 try: 579 selector_obj = selector() 580 if method == 'poll': 581 # check that poll actually works 582 selector_obj.poll(0) 583 else: 584 # close epoll, kqueue, and devpoll fd 585 selector_obj.close() 586 return True 587 except OSError: 588 return False 589 590 591# Choose the best implementation, roughly: 592# epoll|kqueue|devpoll > poll > select. 593# select() also can't accept a FD > FD_SETSIZE (usually around 1024) 594if _can_use('kqueue'): 595 DefaultSelector = KqueueSelector 596elif _can_use('epoll'): 597 DefaultSelector = EpollSelector 598elif _can_use('devpoll'): 599 DefaultSelector = DevpollSelector 600elif _can_use('poll'): 601 DefaultSelector = PollSelector 602else: 603 DefaultSelector = SelectSelector 604