1"""Selectors module. 2 3This module allows high-level and efficient I/O multiplexing, built upon the 4`select` module primitives. 5""" 6 7 8from abc import ABCMeta, abstractmethod 9from collections import namedtuple, Mapping 10import math 11import select 12import sys 13 14 15# generic events, that must be mapped to implementation-specific ones 16EVENT_READ = (1 << 0) 17EVENT_WRITE = (1 << 1) 18 19 20def _fileobj_to_fd(fileobj): 21 """Return a file descriptor from a file object. 22 23 Parameters: 24 fileobj -- file object or file descriptor 25 26 Returns: 27 corresponding file descriptor 28 29 Raises: 30 ValueError if the object is invalid 31 """ 32 if isinstance(fileobj, int): 33 fd = fileobj 34 else: 35 try: 36 fd = int(fileobj.fileno()) 37 except (AttributeError, TypeError, ValueError): 38 raise ValueError("Invalid file object: " 39 "{!r}".format(fileobj)) from None 40 if fd < 0: 41 raise ValueError("Invalid file descriptor: {}".format(fd)) 42 return fd 43 44 45SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) 46 47SelectorKey.__doc__ = """SelectorKey(fileobj, fd, events, data) 48 49 Object used to associate a file object to its backing 50 file descriptor, selected event mask, and attached data. 51""" 52if sys.version_info >= (3, 5): 53 SelectorKey.fileobj.__doc__ = 'File object registered.' 54 SelectorKey.fd.__doc__ = 'Underlying file descriptor.' 55 SelectorKey.events.__doc__ = 'Events that must be waited for on this file object.' 56 SelectorKey.data.__doc__ = ('''Optional opaque data associated to this file object. 57 For example, this could be used to store a per-client session ID.''') 58 59class _SelectorMapping(Mapping): 60 """Mapping of file objects to selector keys.""" 61 62 def __init__(self, selector): 63 self._selector = selector 64 65 def __len__(self): 66 return len(self._selector._fd_to_key) 67 68 def __getitem__(self, fileobj): 69 try: 70 fd = self._selector._fileobj_lookup(fileobj) 71 return self._selector._fd_to_key[fd] 72 except KeyError: 73 raise KeyError("{!r} is not registered".format(fileobj)) from None 74 75 def __iter__(self): 76 return iter(self._selector._fd_to_key) 77 78 79class BaseSelector(metaclass=ABCMeta): 80 """Selector abstract base class. 81 82 A selector supports registering file objects to be monitored for specific 83 I/O events. 84 85 A file object is a file descriptor or any object with a `fileno()` method. 86 An arbitrary object can be attached to the file object, which can be used 87 for example to store context information, a callback, etc. 88 89 A selector can use various implementations (select(), poll(), epoll()...) 90 depending on the platform. The default `Selector` class uses the most 91 efficient implementation on the current platform. 92 """ 93 94 @abstractmethod 95 def register(self, fileobj, events, data=None): 96 """Register a file object. 97 98 Parameters: 99 fileobj -- file object or file descriptor 100 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) 101 data -- attached data 102 103 Returns: 104 SelectorKey instance 105 106 Raises: 107 ValueError if events is invalid 108 KeyError if fileobj is already registered 109 OSError if fileobj is closed or otherwise is unacceptable to 110 the underlying system call (if a system call is made) 111 112 Note: 113 OSError may or may not be raised 114 """ 115 raise NotImplementedError 116 117 @abstractmethod 118 def unregister(self, fileobj): 119 """Unregister a file object. 120 121 Parameters: 122 fileobj -- file object or file descriptor 123 124 Returns: 125 SelectorKey instance 126 127 Raises: 128 KeyError if fileobj is not registered 129 130 Note: 131 If fileobj is registered but has since been closed this does 132 *not* raise OSError (even if the wrapped syscall does) 133 """ 134 raise NotImplementedError 135 136 def modify(self, fileobj, events, data=None): 137 """Change a registered file object monitored events or attached data. 138 139 Parameters: 140 fileobj -- file object or file descriptor 141 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) 142 data -- attached data 143 144 Returns: 145 SelectorKey instance 146 147 Raises: 148 Anything that unregister() or register() raises 149 """ 150 self.unregister(fileobj) 151 return self.register(fileobj, events, data) 152 153 @abstractmethod 154 def select(self, timeout=None): 155 """Perform the actual selection, until some monitored file objects are 156 ready or a timeout expires. 157 158 Parameters: 159 timeout -- if timeout > 0, this specifies the maximum wait time, in 160 seconds 161 if timeout <= 0, the select() call won't block, and will 162 report the currently ready file objects 163 if timeout is None, select() will block until a monitored 164 file object becomes ready 165 166 Returns: 167 list of (key, events) for ready file objects 168 `events` is a bitwise mask of EVENT_READ|EVENT_WRITE 169 """ 170 raise NotImplementedError 171 172 def close(self): 173 """Close the selector. 174 175 This must be called to make sure that any underlying resource is freed. 176 """ 177 pass 178 179 def get_key(self, fileobj): 180 """Return the key associated to a registered file object. 181 182 Returns: 183 SelectorKey for this file object 184 """ 185 mapping = self.get_map() 186 if mapping is None: 187 raise RuntimeError('Selector is closed') 188 try: 189 return mapping[fileobj] 190 except KeyError: 191 raise KeyError("{!r} is not registered".format(fileobj)) from None 192 193 @abstractmethod 194 def get_map(self): 195 """Return a mapping of file objects to selector keys.""" 196 raise NotImplementedError 197 198 def __enter__(self): 199 return self 200 201 def __exit__(self, *args): 202 self.close() 203 204 205class _BaseSelectorImpl(BaseSelector): 206 """Base selector implementation.""" 207 208 def __init__(self): 209 # this maps file descriptors to keys 210 self._fd_to_key = {} 211 # read-only mapping returned by get_map() 212 self._map = _SelectorMapping(self) 213 214 def _fileobj_lookup(self, fileobj): 215 """Return a file descriptor from a file object. 216 217 This wraps _fileobj_to_fd() to do an exhaustive search in case 218 the object is invalid but we still have it in our map. This 219 is used by unregister() so we can unregister an object that 220 was previously registered even if it is closed. It is also 221 used by _SelectorMapping. 222 """ 223 try: 224 return _fileobj_to_fd(fileobj) 225 except ValueError: 226 # Do an exhaustive search. 227 for key in self._fd_to_key.values(): 228 if key.fileobj is fileobj: 229 return key.fd 230 # Raise ValueError after all. 231 raise 232 233 def register(self, fileobj, events, data=None): 234 if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): 235 raise ValueError("Invalid events: {!r}".format(events)) 236 237 key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data) 238 239 if key.fd in self._fd_to_key: 240 raise KeyError("{!r} (FD {}) is already registered" 241 .format(fileobj, key.fd)) 242 243 self._fd_to_key[key.fd] = key 244 return key 245 246 def unregister(self, fileobj): 247 try: 248 key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) 249 except KeyError: 250 raise KeyError("{!r} is not registered".format(fileobj)) from None 251 return key 252 253 def modify(self, fileobj, events, data=None): 254 # TODO: Subclasses can probably optimize this even further. 255 try: 256 key = self._fd_to_key[self._fileobj_lookup(fileobj)] 257 except KeyError: 258 raise KeyError("{!r} is not registered".format(fileobj)) from None 259 if events != key.events: 260 self.unregister(fileobj) 261 key = self.register(fileobj, events, data) 262 elif data != key.data: 263 # Use a shortcut to update the data. 264 key = key._replace(data=data) 265 self._fd_to_key[key.fd] = key 266 return key 267 268 def close(self): 269 self._fd_to_key.clear() 270 self._map = None 271 272 def get_map(self): 273 return self._map 274 275 def _key_from_fd(self, fd): 276 """Return the key associated to a given file descriptor. 277 278 Parameters: 279 fd -- file descriptor 280 281 Returns: 282 corresponding key, or None if not found 283 """ 284 try: 285 return self._fd_to_key[fd] 286 except KeyError: 287 return None 288 289 290class SelectSelector(_BaseSelectorImpl): 291 """Select-based selector.""" 292 293 def __init__(self): 294 super().__init__() 295 self._readers = set() 296 self._writers = set() 297 298 def register(self, fileobj, events, data=None): 299 key = super().register(fileobj, events, data) 300 if events & EVENT_READ: 301 self._readers.add(key.fd) 302 if events & EVENT_WRITE: 303 self._writers.add(key.fd) 304 return key 305 306 def unregister(self, fileobj): 307 key = super().unregister(fileobj) 308 self._readers.discard(key.fd) 309 self._writers.discard(key.fd) 310 return key 311 312 if sys.platform == 'win32': 313 def _select(self, r, w, _, timeout=None): 314 r, w, x = select.select(r, w, w, timeout) 315 return r, w + x, [] 316 else: 317 _select = select.select 318 319 def select(self, timeout=None): 320 timeout = None if timeout is None else max(timeout, 0) 321 ready = [] 322 try: 323 r, w, _ = self._select(self._readers, self._writers, [], timeout) 324 except InterruptedError: 325 return ready 326 r = set(r) 327 w = set(w) 328 for fd in r | w: 329 events = 0 330 if fd in r: 331 events |= EVENT_READ 332 if fd in w: 333 events |= EVENT_WRITE 334 335 key = self._key_from_fd(fd) 336 if key: 337 ready.append((key, events & key.events)) 338 return ready 339 340 341if hasattr(select, 'poll'): 342 343 class PollSelector(_BaseSelectorImpl): 344 """Poll-based selector.""" 345 346 def __init__(self): 347 super().__init__() 348 self._poll = select.poll() 349 350 def register(self, fileobj, events, data=None): 351 key = super().register(fileobj, events, data) 352 poll_events = 0 353 if events & EVENT_READ: 354 poll_events |= select.POLLIN 355 if events & EVENT_WRITE: 356 poll_events |= select.POLLOUT 357 self._poll.register(key.fd, poll_events) 358 return key 359 360 def unregister(self, fileobj): 361 key = super().unregister(fileobj) 362 self._poll.unregister(key.fd) 363 return key 364 365 def select(self, timeout=None): 366 if timeout is None: 367 timeout = None 368 elif timeout <= 0: 369 timeout = 0 370 else: 371 # poll() has a resolution of 1 millisecond, round away from 372 # zero to wait *at least* timeout seconds. 373 timeout = math.ceil(timeout * 1e3) 374 ready = [] 375 try: 376 fd_event_list = self._poll.poll(timeout) 377 except InterruptedError: 378 return ready 379 for fd, event in fd_event_list: 380 events = 0 381 if event & ~select.POLLIN: 382 events |= EVENT_WRITE 383 if event & ~select.POLLOUT: 384 events |= EVENT_READ 385 386 key = self._key_from_fd(fd) 387 if key: 388 ready.append((key, events & key.events)) 389 return ready 390 391 392if hasattr(select, 'epoll'): 393 394 class EpollSelector(_BaseSelectorImpl): 395 """Epoll-based selector.""" 396 397 def __init__(self): 398 super().__init__() 399 self._epoll = select.epoll() 400 401 def fileno(self): 402 return self._epoll.fileno() 403 404 def register(self, fileobj, events, data=None): 405 key = super().register(fileobj, events, data) 406 epoll_events = 0 407 if events & EVENT_READ: 408 epoll_events |= select.EPOLLIN 409 if events & EVENT_WRITE: 410 epoll_events |= select.EPOLLOUT 411 try: 412 self._epoll.register(key.fd, epoll_events) 413 except BaseException: 414 super().unregister(fileobj) 415 raise 416 return key 417 418 def unregister(self, fileobj): 419 key = super().unregister(fileobj) 420 try: 421 self._epoll.unregister(key.fd) 422 except OSError: 423 # This can happen if the FD was closed since it 424 # was registered. 425 pass 426 return key 427 428 def select(self, timeout=None): 429 if timeout is None: 430 timeout = -1 431 elif timeout <= 0: 432 timeout = 0 433 else: 434 # epoll_wait() has a resolution of 1 millisecond, round away 435 # from zero to wait *at least* timeout seconds. 436 timeout = math.ceil(timeout * 1e3) * 1e-3 437 438 # epoll_wait() expects `maxevents` to be greater than zero; 439 # we want to make sure that `select()` can be called when no 440 # FD is registered. 441 max_ev = max(len(self._fd_to_key), 1) 442 443 ready = [] 444 try: 445 fd_event_list = self._epoll.poll(timeout, max_ev) 446 except InterruptedError: 447 return ready 448 for fd, event in fd_event_list: 449 events = 0 450 if event & ~select.EPOLLIN: 451 events |= EVENT_WRITE 452 if event & ~select.EPOLLOUT: 453 events |= EVENT_READ 454 455 key = self._key_from_fd(fd) 456 if key: 457 ready.append((key, events & key.events)) 458 return ready 459 460 def close(self): 461 self._epoll.close() 462 super().close() 463 464 465if hasattr(select, 'devpoll'): 466 467 class DevpollSelector(_BaseSelectorImpl): 468 """Solaris /dev/poll selector.""" 469 470 def __init__(self): 471 super().__init__() 472 self._devpoll = select.devpoll() 473 474 def fileno(self): 475 return self._devpoll.fileno() 476 477 def register(self, fileobj, events, data=None): 478 key = super().register(fileobj, events, data) 479 poll_events = 0 480 if events & EVENT_READ: 481 poll_events |= select.POLLIN 482 if events & EVENT_WRITE: 483 poll_events |= select.POLLOUT 484 self._devpoll.register(key.fd, poll_events) 485 return key 486 487 def unregister(self, fileobj): 488 key = super().unregister(fileobj) 489 self._devpoll.unregister(key.fd) 490 return key 491 492 def select(self, timeout=None): 493 if timeout is None: 494 timeout = None 495 elif timeout <= 0: 496 timeout = 0 497 else: 498 # devpoll() has a resolution of 1 millisecond, round away from 499 # zero to wait *at least* timeout seconds. 500 timeout = math.ceil(timeout * 1e3) 501 ready = [] 502 try: 503 fd_event_list = self._devpoll.poll(timeout) 504 except InterruptedError: 505 return ready 506 for fd, event in fd_event_list: 507 events = 0 508 if event & ~select.POLLIN: 509 events |= EVENT_WRITE 510 if event & ~select.POLLOUT: 511 events |= EVENT_READ 512 513 key = self._key_from_fd(fd) 514 if key: 515 ready.append((key, events & key.events)) 516 return ready 517 518 def close(self): 519 self._devpoll.close() 520 super().close() 521 522 523if hasattr(select, 'kqueue'): 524 525 class KqueueSelector(_BaseSelectorImpl): 526 """Kqueue-based selector.""" 527 528 def __init__(self): 529 super().__init__() 530 self._kqueue = select.kqueue() 531 532 def fileno(self): 533 return self._kqueue.fileno() 534 535 def register(self, fileobj, events, data=None): 536 key = super().register(fileobj, events, data) 537 try: 538 if events & EVENT_READ: 539 kev = select.kevent(key.fd, select.KQ_FILTER_READ, 540 select.KQ_EV_ADD) 541 self._kqueue.control([kev], 0, 0) 542 if events & EVENT_WRITE: 543 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, 544 select.KQ_EV_ADD) 545 self._kqueue.control([kev], 0, 0) 546 except BaseException: 547 super().unregister(fileobj) 548 raise 549 return key 550 551 def unregister(self, fileobj): 552 key = super().unregister(fileobj) 553 if key.events & EVENT_READ: 554 kev = select.kevent(key.fd, select.KQ_FILTER_READ, 555 select.KQ_EV_DELETE) 556 try: 557 self._kqueue.control([kev], 0, 0) 558 except OSError: 559 # This can happen if the FD was closed since it 560 # was registered. 561 pass 562 if key.events & EVENT_WRITE: 563 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, 564 select.KQ_EV_DELETE) 565 try: 566 self._kqueue.control([kev], 0, 0) 567 except OSError: 568 # See comment above. 569 pass 570 return key 571 572 def select(self, timeout=None): 573 timeout = None if timeout is None else max(timeout, 0) 574 max_ev = len(self._fd_to_key) 575 ready = [] 576 try: 577 kev_list = self._kqueue.control(None, max_ev, timeout) 578 except InterruptedError: 579 return ready 580 for kev in kev_list: 581 fd = kev.ident 582 flag = kev.filter 583 events = 0 584 if flag == select.KQ_FILTER_READ: 585 events |= EVENT_READ 586 if flag == select.KQ_FILTER_WRITE: 587 events |= EVENT_WRITE 588 589 key = self._key_from_fd(fd) 590 if key: 591 ready.append((key, events & key.events)) 592 return ready 593 594 def close(self): 595 self._kqueue.close() 596 super().close() 597 598 599# Choose the best implementation, roughly: 600# epoll|kqueue|devpoll > poll > select. 601# select() also can't accept a FD > FD_SETSIZE (usually around 1024) 602if 'KqueueSelector' in globals(): 603 DefaultSelector = KqueueSelector 604elif 'EpollSelector' in globals(): 605 DefaultSelector = EpollSelector 606elif 'DevpollSelector' in globals(): 607 DefaultSelector = DevpollSelector 608elif 'PollSelector' in globals(): 609 DefaultSelector = PollSelector 610else: 611 DefaultSelector = SelectSelector 612