1"""Selectors module. 2 3This module allows high-level and efficient I/O multiplexing, built upon the 4`select` module primitives. 5""" 6 7 8from abc import ABCMeta, abstractmethod 9from collections import namedtuple 10from collections.abc import Mapping 11import math 12import select 13import sys 14 15 16# generic events, that must be mapped to implementation-specific ones 17EVENT_READ = (1 << 0) 18EVENT_WRITE = (1 << 1) 19 20 21def _fileobj_to_fd(fileobj): 22 """Return a file descriptor from a file object. 23 24 Parameters: 25 fileobj -- file object or file descriptor 26 27 Returns: 28 corresponding file descriptor 29 30 Raises: 31 ValueError if the object is invalid 32 """ 33 if isinstance(fileobj, int): 34 fd = fileobj 35 else: 36 try: 37 fd = int(fileobj.fileno()) 38 except (AttributeError, TypeError, ValueError): 39 raise ValueError("Invalid file object: " 40 "{!r}".format(fileobj)) from None 41 if fd < 0: 42 raise ValueError("Invalid file descriptor: {}".format(fd)) 43 return fd 44 45 46SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) 47 48SelectorKey.__doc__ = """SelectorKey(fileobj, fd, events, data) 49 50 Object used to associate a file object to its backing 51 file descriptor, selected event mask, and attached data. 52""" 53if sys.version_info >= (3, 5): 54 SelectorKey.fileobj.__doc__ = 'File object registered.' 55 SelectorKey.fd.__doc__ = 'Underlying file descriptor.' 56 SelectorKey.events.__doc__ = 'Events that must be waited for on this file object.' 57 SelectorKey.data.__doc__ = ('''Optional opaque data associated to this file object. 58 For example, this could be used to store a per-client session ID.''') 59 60class _SelectorMapping(Mapping): 61 """Mapping of file objects to selector keys.""" 62 63 def __init__(self, selector): 64 self._selector = selector 65 66 def __len__(self): 67 return len(self._selector._fd_to_key) 68 69 def __getitem__(self, fileobj): 70 try: 71 fd = self._selector._fileobj_lookup(fileobj) 72 return self._selector._fd_to_key[fd] 73 except KeyError: 74 raise KeyError("{!r} is not registered".format(fileobj)) from None 75 76 def __iter__(self): 77 return iter(self._selector._fd_to_key) 78 79 80class BaseSelector(metaclass=ABCMeta): 81 """Selector abstract base class. 82 83 A selector supports registering file objects to be monitored for specific 84 I/O events. 85 86 A file object is a file descriptor or any object with a `fileno()` method. 87 An arbitrary object can be attached to the file object, which can be used 88 for example to store context information, a callback, etc. 89 90 A selector can use various implementations (select(), poll(), epoll()...) 91 depending on the platform. The default `Selector` class uses the most 92 efficient implementation on the current platform. 93 """ 94 95 @abstractmethod 96 def register(self, fileobj, events, data=None): 97 """Register a file object. 98 99 Parameters: 100 fileobj -- file object or file descriptor 101 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) 102 data -- attached data 103 104 Returns: 105 SelectorKey instance 106 107 Raises: 108 ValueError if events is invalid 109 KeyError if fileobj is already registered 110 OSError if fileobj is closed or otherwise is unacceptable to 111 the underlying system call (if a system call is made) 112 113 Note: 114 OSError may or may not be raised 115 """ 116 raise NotImplementedError 117 118 @abstractmethod 119 def unregister(self, fileobj): 120 """Unregister a file object. 121 122 Parameters: 123 fileobj -- file object or file descriptor 124 125 Returns: 126 SelectorKey instance 127 128 Raises: 129 KeyError if fileobj is not registered 130 131 Note: 132 If fileobj is registered but has since been closed this does 133 *not* raise OSError (even if the wrapped syscall does) 134 """ 135 raise NotImplementedError 136 137 def modify(self, fileobj, events, data=None): 138 """Change a registered file object monitored events or attached data. 139 140 Parameters: 141 fileobj -- file object or file descriptor 142 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) 143 data -- attached data 144 145 Returns: 146 SelectorKey instance 147 148 Raises: 149 Anything that unregister() or register() raises 150 """ 151 self.unregister(fileobj) 152 return self.register(fileobj, events, data) 153 154 @abstractmethod 155 def select(self, timeout=None): 156 """Perform the actual selection, until some monitored file objects are 157 ready or a timeout expires. 158 159 Parameters: 160 timeout -- if timeout > 0, this specifies the maximum wait time, in 161 seconds 162 if timeout <= 0, the select() call won't block, and will 163 report the currently ready file objects 164 if timeout is None, select() will block until a monitored 165 file object becomes ready 166 167 Returns: 168 list of (key, events) for ready file objects 169 `events` is a bitwise mask of EVENT_READ|EVENT_WRITE 170 """ 171 raise NotImplementedError 172 173 def close(self): 174 """Close the selector. 175 176 This must be called to make sure that any underlying resource is freed. 177 """ 178 pass 179 180 def get_key(self, fileobj): 181 """Return the key associated to a registered file object. 182 183 Returns: 184 SelectorKey for this file object 185 """ 186 mapping = self.get_map() 187 if mapping is None: 188 raise RuntimeError('Selector is closed') 189 try: 190 return mapping[fileobj] 191 except KeyError: 192 raise KeyError("{!r} is not registered".format(fileobj)) from None 193 194 @abstractmethod 195 def get_map(self): 196 """Return a mapping of file objects to selector keys.""" 197 raise NotImplementedError 198 199 def __enter__(self): 200 return self 201 202 def __exit__(self, *args): 203 self.close() 204 205 206class _BaseSelectorImpl(BaseSelector): 207 """Base selector implementation.""" 208 209 def __init__(self): 210 # this maps file descriptors to keys 211 self._fd_to_key = {} 212 # read-only mapping returned by get_map() 213 self._map = _SelectorMapping(self) 214 215 def _fileobj_lookup(self, fileobj): 216 """Return a file descriptor from a file object. 217 218 This wraps _fileobj_to_fd() to do an exhaustive search in case 219 the object is invalid but we still have it in our map. This 220 is used by unregister() so we can unregister an object that 221 was previously registered even if it is closed. It is also 222 used by _SelectorMapping. 223 """ 224 try: 225 return _fileobj_to_fd(fileobj) 226 except ValueError: 227 # Do an exhaustive search. 228 for key in self._fd_to_key.values(): 229 if key.fileobj is fileobj: 230 return key.fd 231 # Raise ValueError after all. 232 raise 233 234 def register(self, fileobj, events, data=None): 235 if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): 236 raise ValueError("Invalid events: {!r}".format(events)) 237 238 key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data) 239 240 if key.fd in self._fd_to_key: 241 raise KeyError("{!r} (FD {}) is already registered" 242 .format(fileobj, key.fd)) 243 244 self._fd_to_key[key.fd] = key 245 return key 246 247 def unregister(self, fileobj): 248 try: 249 key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) 250 except KeyError: 251 raise KeyError("{!r} is not registered".format(fileobj)) from None 252 return key 253 254 def modify(self, fileobj, events, data=None): 255 try: 256 key = self._fd_to_key[self._fileobj_lookup(fileobj)] 257 except KeyError: 258 raise KeyError("{!r} is not registered".format(fileobj)) from None 259 if events != key.events: 260 self.unregister(fileobj) 261 key = self.register(fileobj, events, data) 262 elif data != key.data: 263 # Use a shortcut to update the data. 264 key = key._replace(data=data) 265 self._fd_to_key[key.fd] = key 266 return key 267 268 def close(self): 269 self._fd_to_key.clear() 270 self._map = None 271 272 def get_map(self): 273 return self._map 274 275 def _key_from_fd(self, fd): 276 """Return the key associated to a given file descriptor. 277 278 Parameters: 279 fd -- file descriptor 280 281 Returns: 282 corresponding key, or None if not found 283 """ 284 try: 285 return self._fd_to_key[fd] 286 except KeyError: 287 return None 288 289 290class SelectSelector(_BaseSelectorImpl): 291 """Select-based selector.""" 292 293 def __init__(self): 294 super().__init__() 295 self._readers = set() 296 self._writers = set() 297 298 def register(self, fileobj, events, data=None): 299 key = super().register(fileobj, events, data) 300 if events & EVENT_READ: 301 self._readers.add(key.fd) 302 if events & EVENT_WRITE: 303 self._writers.add(key.fd) 304 return key 305 306 def unregister(self, fileobj): 307 key = super().unregister(fileobj) 308 self._readers.discard(key.fd) 309 self._writers.discard(key.fd) 310 return key 311 312 if sys.platform == 'win32': 313 def _select(self, r, w, _, timeout=None): 314 r, w, x = select.select(r, w, w, timeout) 315 return r, w + x, [] 316 else: 317 _select = select.select 318 319 def select(self, timeout=None): 320 timeout = None if timeout is None else max(timeout, 0) 321 ready = [] 322 try: 323 r, w, _ = self._select(self._readers, self._writers, [], timeout) 324 except InterruptedError: 325 return ready 326 r = set(r) 327 w = set(w) 328 for fd in r | w: 329 events = 0 330 if fd in r: 331 events |= EVENT_READ 332 if fd in w: 333 events |= EVENT_WRITE 334 335 key = self._key_from_fd(fd) 336 if key: 337 ready.append((key, events & key.events)) 338 return ready 339 340 341class _PollLikeSelector(_BaseSelectorImpl): 342 """Base class shared between poll, epoll and devpoll selectors.""" 343 _selector_cls = None 344 _EVENT_READ = None 345 _EVENT_WRITE = None 346 347 def __init__(self): 348 super().__init__() 349 self._selector = self._selector_cls() 350 351 def register(self, fileobj, events, data=None): 352 key = super().register(fileobj, events, data) 353 poller_events = 0 354 if events & EVENT_READ: 355 poller_events |= self._EVENT_READ 356 if events & EVENT_WRITE: 357 poller_events |= self._EVENT_WRITE 358 try: 359 self._selector.register(key.fd, poller_events) 360 except: 361 super().unregister(fileobj) 362 raise 363 return key 364 365 def unregister(self, fileobj): 366 key = super().unregister(fileobj) 367 try: 368 self._selector.unregister(key.fd) 369 except OSError: 370 # This can happen if the FD was closed since it 371 # was registered. 372 pass 373 return key 374 375 def modify(self, fileobj, events, data=None): 376 try: 377 key = self._fd_to_key[self._fileobj_lookup(fileobj)] 378 except KeyError: 379 raise KeyError(f"{fileobj!r} is not registered") from None 380 381 changed = False 382 if events != key.events: 383 selector_events = 0 384 if events & EVENT_READ: 385 selector_events |= self._EVENT_READ 386 if events & EVENT_WRITE: 387 selector_events |= self._EVENT_WRITE 388 try: 389 self._selector.modify(key.fd, selector_events) 390 except: 391 super().unregister(fileobj) 392 raise 393 changed = True 394 if data != key.data: 395 changed = True 396 397 if changed: 398 key = key._replace(events=events, data=data) 399 self._fd_to_key[key.fd] = key 400 return key 401 402 def select(self, timeout=None): 403 # This is shared between poll() and epoll(). 404 # epoll() has a different signature and handling of timeout parameter. 405 if timeout is None: 406 timeout = None 407 elif timeout <= 0: 408 timeout = 0 409 else: 410 # poll() has a resolution of 1 millisecond, round away from 411 # zero to wait *at least* timeout seconds. 412 timeout = math.ceil(timeout * 1e3) 413 ready = [] 414 try: 415 fd_event_list = self._selector.poll(timeout) 416 except InterruptedError: 417 return ready 418 for fd, event in fd_event_list: 419 events = 0 420 if event & ~self._EVENT_READ: 421 events |= EVENT_WRITE 422 if event & ~self._EVENT_WRITE: 423 events |= EVENT_READ 424 425 key = self._key_from_fd(fd) 426 if key: 427 ready.append((key, events & key.events)) 428 return ready 429 430 431if hasattr(select, 'poll'): 432 433 class PollSelector(_PollLikeSelector): 434 """Poll-based selector.""" 435 _selector_cls = select.poll 436 _EVENT_READ = select.POLLIN 437 _EVENT_WRITE = select.POLLOUT 438 439 440if hasattr(select, 'epoll'): 441 442 class EpollSelector(_PollLikeSelector): 443 """Epoll-based selector.""" 444 _selector_cls = select.epoll 445 _EVENT_READ = select.EPOLLIN 446 _EVENT_WRITE = select.EPOLLOUT 447 448 def fileno(self): 449 return self._selector.fileno() 450 451 def select(self, timeout=None): 452 if timeout is None: 453 timeout = -1 454 elif timeout <= 0: 455 timeout = 0 456 else: 457 # epoll_wait() has a resolution of 1 millisecond, round away 458 # from zero to wait *at least* timeout seconds. 459 timeout = math.ceil(timeout * 1e3) * 1e-3 460 461 # epoll_wait() expects `maxevents` to be greater than zero; 462 # we want to make sure that `select()` can be called when no 463 # FD is registered. 464 max_ev = max(len(self._fd_to_key), 1) 465 466 ready = [] 467 try: 468 fd_event_list = self._selector.poll(timeout, max_ev) 469 except InterruptedError: 470 return ready 471 for fd, event in fd_event_list: 472 events = 0 473 if event & ~select.EPOLLIN: 474 events |= EVENT_WRITE 475 if event & ~select.EPOLLOUT: 476 events |= EVENT_READ 477 478 key = self._key_from_fd(fd) 479 if key: 480 ready.append((key, events & key.events)) 481 return ready 482 483 def close(self): 484 self._selector.close() 485 super().close() 486 487 488if hasattr(select, 'devpoll'): 489 490 class DevpollSelector(_PollLikeSelector): 491 """Solaris /dev/poll selector.""" 492 _selector_cls = select.devpoll 493 _EVENT_READ = select.POLLIN 494 _EVENT_WRITE = select.POLLOUT 495 496 def fileno(self): 497 return self._selector.fileno() 498 499 def close(self): 500 self._selector.close() 501 super().close() 502 503 504if hasattr(select, 'kqueue'): 505 506 class KqueueSelector(_BaseSelectorImpl): 507 """Kqueue-based selector.""" 508 509 def __init__(self): 510 super().__init__() 511 self._selector = select.kqueue() 512 513 def fileno(self): 514 return self._selector.fileno() 515 516 def register(self, fileobj, events, data=None): 517 key = super().register(fileobj, events, data) 518 try: 519 if events & EVENT_READ: 520 kev = select.kevent(key.fd, select.KQ_FILTER_READ, 521 select.KQ_EV_ADD) 522 self._selector.control([kev], 0, 0) 523 if events & EVENT_WRITE: 524 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, 525 select.KQ_EV_ADD) 526 self._selector.control([kev], 0, 0) 527 except: 528 super().unregister(fileobj) 529 raise 530 return key 531 532 def unregister(self, fileobj): 533 key = super().unregister(fileobj) 534 if key.events & EVENT_READ: 535 kev = select.kevent(key.fd, select.KQ_FILTER_READ, 536 select.KQ_EV_DELETE) 537 try: 538 self._selector.control([kev], 0, 0) 539 except OSError: 540 # This can happen if the FD was closed since it 541 # was registered. 542 pass 543 if key.events & EVENT_WRITE: 544 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, 545 select.KQ_EV_DELETE) 546 try: 547 self._selector.control([kev], 0, 0) 548 except OSError: 549 # See comment above. 550 pass 551 return key 552 553 def select(self, timeout=None): 554 timeout = None if timeout is None else max(timeout, 0) 555 max_ev = len(self._fd_to_key) 556 ready = [] 557 try: 558 kev_list = self._selector.control(None, max_ev, timeout) 559 except InterruptedError: 560 return ready 561 for kev in kev_list: 562 fd = kev.ident 563 flag = kev.filter 564 events = 0 565 if flag == select.KQ_FILTER_READ: 566 events |= EVENT_READ 567 if flag == select.KQ_FILTER_WRITE: 568 events |= EVENT_WRITE 569 570 key = self._key_from_fd(fd) 571 if key: 572 ready.append((key, events & key.events)) 573 return ready 574 575 def close(self): 576 self._selector.close() 577 super().close() 578 579 580# Choose the best implementation, roughly: 581# epoll|kqueue|devpoll > poll > select. 582# select() also can't accept a FD > FD_SETSIZE (usually around 1024) 583if 'KqueueSelector' in globals(): 584 DefaultSelector = KqueueSelector 585elif 'EpollSelector' in globals(): 586 DefaultSelector = EpollSelector 587elif 'DevpollSelector' in globals(): 588 DefaultSelector = DevpollSelector 589elif 'PollSelector' in globals(): 590 DefaultSelector = PollSelector 591else: 592 DefaultSelector = SelectSelector 593