1# -*- Mode: Python -*- 2# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp 3# Author: Sam Rushing <rushing@nightmare.com> 4 5# ====================================================================== 6# Copyright 1996 by Sam Rushing 7# 8# All Rights Reserved 9# 10# Permission to use, copy, modify, and distribute this software and 11# its documentation for any purpose and without fee is hereby 12# granted, provided that the above copyright notice appear in all 13# copies and that both that copyright notice and this permission 14# notice appear in supporting documentation, and that the name of Sam 15# Rushing not be used in advertising or publicity pertaining to 16# distribution of the software without specific, written prior 17# permission. 18# 19# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, 20# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN 21# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR 22# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS 23# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 24# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 25# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 26# ====================================================================== 27 28"""Basic infrastructure for asynchronous socket service clients and servers. 29 30There are only two ways to have a program on a single processor do "more 31than one thing at a time". Multi-threaded programming is the simplest and 32most popular way to do it, but there is another very different technique, 33that lets you have nearly all the advantages of multi-threading, without 34actually using multiple threads. it's really only practical if your program 35is largely I/O bound. If your program is CPU bound, then pre-emptive 36scheduled threads are probably what you really need. Network servers are 37rarely CPU-bound, however. 38 39If your operating system supports the select() system call in its I/O 40library (and nearly all do), then you can use it to juggle multiple 41communication channels at once; doing other work while your I/O is taking 42place in the "background." Although this strategy can seem strange and 43complex, especially at first, it is in many ways easier to understand and 44control than multi-threaded programming. The module documented here solves 45many of the difficult problems for you, making the task of building 46sophisticated high-performance network servers and clients a snap. 47""" 48 49import select 50import socket 51import sys 52import time 53import warnings 54 55import os 56from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \ 57 ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \ 58 errorcode 59 60warnings.warn( 61 'The asyncore module is deprecated. ' 62 'The recommended replacement is asyncio', 63 DeprecationWarning, 64 stacklevel=2) 65 66 67_DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE, 68 EBADF}) 69 70try: 71 socket_map 72except NameError: 73 socket_map = {} 74 75def _strerror(err): 76 try: 77 return os.strerror(err) 78 except (ValueError, OverflowError, NameError): 79 if err in errorcode: 80 return errorcode[err] 81 return "Unknown error %s" %err 82 83class ExitNow(Exception): 84 pass 85 86_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit) 87 88def read(obj): 89 try: 90 obj.handle_read_event() 91 except _reraised_exceptions: 92 raise 93 except: 94 obj.handle_error() 95 96def write(obj): 97 try: 98 obj.handle_write_event() 99 except _reraised_exceptions: 100 raise 101 except: 102 obj.handle_error() 103 104def _exception(obj): 105 try: 106 obj.handle_expt_event() 107 except _reraised_exceptions: 108 raise 109 except: 110 obj.handle_error() 111 112def readwrite(obj, flags): 113 try: 114 if flags & select.POLLIN: 115 obj.handle_read_event() 116 if flags & select.POLLOUT: 117 obj.handle_write_event() 118 if flags & select.POLLPRI: 119 obj.handle_expt_event() 120 if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL): 121 obj.handle_close() 122 except OSError as e: 123 if e.errno not in _DISCONNECTED: 124 obj.handle_error() 125 else: 126 obj.handle_close() 127 except _reraised_exceptions: 128 raise 129 except: 130 obj.handle_error() 131 132def poll(timeout=0.0, map=None): 133 if map is None: 134 map = socket_map 135 if map: 136 r = []; w = []; e = [] 137 for fd, obj in list(map.items()): 138 is_r = obj.readable() 139 is_w = obj.writable() 140 if is_r: 141 r.append(fd) 142 # accepting sockets should not be writable 143 if is_w and not obj.accepting: 144 w.append(fd) 145 if is_r or is_w: 146 e.append(fd) 147 if [] == r == w == e: 148 time.sleep(timeout) 149 return 150 151 r, w, e = select.select(r, w, e, timeout) 152 153 for fd in r: 154 obj = map.get(fd) 155 if obj is None: 156 continue 157 read(obj) 158 159 for fd in w: 160 obj = map.get(fd) 161 if obj is None: 162 continue 163 write(obj) 164 165 for fd in e: 166 obj = map.get(fd) 167 if obj is None: 168 continue 169 _exception(obj) 170 171def poll2(timeout=0.0, map=None): 172 # Use the poll() support added to the select module in Python 2.0 173 if map is None: 174 map = socket_map 175 if timeout is not None: 176 # timeout is in milliseconds 177 timeout = int(timeout*1000) 178 pollster = select.poll() 179 if map: 180 for fd, obj in list(map.items()): 181 flags = 0 182 if obj.readable(): 183 flags |= select.POLLIN | select.POLLPRI 184 # accepting sockets should not be writable 185 if obj.writable() and not obj.accepting: 186 flags |= select.POLLOUT 187 if flags: 188 pollster.register(fd, flags) 189 190 r = pollster.poll(timeout) 191 for fd, flags in r: 192 obj = map.get(fd) 193 if obj is None: 194 continue 195 readwrite(obj, flags) 196 197poll3 = poll2 # Alias for backward compatibility 198 199def loop(timeout=30.0, use_poll=False, map=None, count=None): 200 if map is None: 201 map = socket_map 202 203 if use_poll and hasattr(select, 'poll'): 204 poll_fun = poll2 205 else: 206 poll_fun = poll 207 208 if count is None: 209 while map: 210 poll_fun(timeout, map) 211 212 else: 213 while map and count > 0: 214 poll_fun(timeout, map) 215 count = count - 1 216 217class dispatcher: 218 219 debug = False 220 connected = False 221 accepting = False 222 connecting = False 223 closing = False 224 addr = None 225 ignore_log_types = frozenset({'warning'}) 226 227 def __init__(self, sock=None, map=None): 228 if map is None: 229 self._map = socket_map 230 else: 231 self._map = map 232 233 self._fileno = None 234 235 if sock: 236 # Set to nonblocking just to make sure for cases where we 237 # get a socket from a blocking source. 238 sock.setblocking(False) 239 self.set_socket(sock, map) 240 self.connected = True 241 # The constructor no longer requires that the socket 242 # passed be connected. 243 try: 244 self.addr = sock.getpeername() 245 except OSError as err: 246 if err.errno in (ENOTCONN, EINVAL): 247 # To handle the case where we got an unconnected 248 # socket. 249 self.connected = False 250 else: 251 # The socket is broken in some unknown way, alert 252 # the user and remove it from the map (to prevent 253 # polling of broken sockets). 254 self.del_channel(map) 255 raise 256 else: 257 self.socket = None 258 259 def __repr__(self): 260 status = [self.__class__.__module__+"."+self.__class__.__qualname__] 261 if self.accepting and self.addr: 262 status.append('listening') 263 elif self.connected: 264 status.append('connected') 265 if self.addr is not None: 266 try: 267 status.append('%s:%d' % self.addr) 268 except TypeError: 269 status.append(repr(self.addr)) 270 return '<%s at %#x>' % (' '.join(status), id(self)) 271 272 def add_channel(self, map=None): 273 #self.log_info('adding channel %s' % self) 274 if map is None: 275 map = self._map 276 map[self._fileno] = self 277 278 def del_channel(self, map=None): 279 fd = self._fileno 280 if map is None: 281 map = self._map 282 if fd in map: 283 #self.log_info('closing channel %d:%s' % (fd, self)) 284 del map[fd] 285 self._fileno = None 286 287 def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM): 288 self.family_and_type = family, type 289 sock = socket.socket(family, type) 290 sock.setblocking(False) 291 self.set_socket(sock) 292 293 def set_socket(self, sock, map=None): 294 self.socket = sock 295 self._fileno = sock.fileno() 296 self.add_channel(map) 297 298 def set_reuse_addr(self): 299 # try to re-use a server port if possible 300 try: 301 self.socket.setsockopt( 302 socket.SOL_SOCKET, socket.SO_REUSEADDR, 303 self.socket.getsockopt(socket.SOL_SOCKET, 304 socket.SO_REUSEADDR) | 1 305 ) 306 except OSError: 307 pass 308 309 # ================================================== 310 # predicates for select() 311 # these are used as filters for the lists of sockets 312 # to pass to select(). 313 # ================================================== 314 315 def readable(self): 316 return True 317 318 def writable(self): 319 return True 320 321 # ================================================== 322 # socket object methods. 323 # ================================================== 324 325 def listen(self, num): 326 self.accepting = True 327 if os.name == 'nt' and num > 5: 328 num = 5 329 return self.socket.listen(num) 330 331 def bind(self, addr): 332 self.addr = addr 333 return self.socket.bind(addr) 334 335 def connect(self, address): 336 self.connected = False 337 self.connecting = True 338 err = self.socket.connect_ex(address) 339 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \ 340 or err == EINVAL and os.name == 'nt': 341 self.addr = address 342 return 343 if err in (0, EISCONN): 344 self.addr = address 345 self.handle_connect_event() 346 else: 347 raise OSError(err, errorcode[err]) 348 349 def accept(self): 350 # XXX can return either an address pair or None 351 try: 352 conn, addr = self.socket.accept() 353 except TypeError: 354 return None 355 except OSError as why: 356 if why.errno in (EWOULDBLOCK, ECONNABORTED, EAGAIN): 357 return None 358 else: 359 raise 360 else: 361 return conn, addr 362 363 def send(self, data): 364 try: 365 result = self.socket.send(data) 366 return result 367 except OSError as why: 368 if why.errno == EWOULDBLOCK: 369 return 0 370 elif why.errno in _DISCONNECTED: 371 self.handle_close() 372 return 0 373 else: 374 raise 375 376 def recv(self, buffer_size): 377 try: 378 data = self.socket.recv(buffer_size) 379 if not data: 380 # a closed connection is indicated by signaling 381 # a read condition, and having recv() return 0. 382 self.handle_close() 383 return b'' 384 else: 385 return data 386 except OSError as why: 387 # winsock sometimes raises ENOTCONN 388 if why.errno in _DISCONNECTED: 389 self.handle_close() 390 return b'' 391 else: 392 raise 393 394 def close(self): 395 self.connected = False 396 self.accepting = False 397 self.connecting = False 398 self.del_channel() 399 if self.socket is not None: 400 try: 401 self.socket.close() 402 except OSError as why: 403 if why.errno not in (ENOTCONN, EBADF): 404 raise 405 406 # log and log_info may be overridden to provide more sophisticated 407 # logging and warning methods. In general, log is for 'hit' logging 408 # and 'log_info' is for informational, warning and error logging. 409 410 def log(self, message): 411 sys.stderr.write('log: %s\n' % str(message)) 412 413 def log_info(self, message, type='info'): 414 if type not in self.ignore_log_types: 415 print('%s: %s' % (type, message)) 416 417 def handle_read_event(self): 418 if self.accepting: 419 # accepting sockets are never connected, they "spawn" new 420 # sockets that are connected 421 self.handle_accept() 422 elif not self.connected: 423 if self.connecting: 424 self.handle_connect_event() 425 self.handle_read() 426 else: 427 self.handle_read() 428 429 def handle_connect_event(self): 430 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 431 if err != 0: 432 raise OSError(err, _strerror(err)) 433 self.handle_connect() 434 self.connected = True 435 self.connecting = False 436 437 def handle_write_event(self): 438 if self.accepting: 439 # Accepting sockets shouldn't get a write event. 440 # We will pretend it didn't happen. 441 return 442 443 if not self.connected: 444 if self.connecting: 445 self.handle_connect_event() 446 self.handle_write() 447 448 def handle_expt_event(self): 449 # handle_expt_event() is called if there might be an error on the 450 # socket, or if there is OOB data 451 # check for the error condition first 452 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 453 if err != 0: 454 # we can get here when select.select() says that there is an 455 # exceptional condition on the socket 456 # since there is an error, we'll go ahead and close the socket 457 # like we would in a subclassed handle_read() that received no 458 # data 459 self.handle_close() 460 else: 461 self.handle_expt() 462 463 def handle_error(self): 464 nil, t, v, tbinfo = compact_traceback() 465 466 # sometimes a user repr method will crash. 467 try: 468 self_repr = repr(self) 469 except: 470 self_repr = '<__repr__(self) failed for object at %0x>' % id(self) 471 472 self.log_info( 473 'uncaptured python exception, closing channel %s (%s:%s %s)' % ( 474 self_repr, 475 t, 476 v, 477 tbinfo 478 ), 479 'error' 480 ) 481 self.handle_close() 482 483 def handle_expt(self): 484 self.log_info('unhandled incoming priority event', 'warning') 485 486 def handle_read(self): 487 self.log_info('unhandled read event', 'warning') 488 489 def handle_write(self): 490 self.log_info('unhandled write event', 'warning') 491 492 def handle_connect(self): 493 self.log_info('unhandled connect event', 'warning') 494 495 def handle_accept(self): 496 pair = self.accept() 497 if pair is not None: 498 self.handle_accepted(*pair) 499 500 def handle_accepted(self, sock, addr): 501 sock.close() 502 self.log_info('unhandled accepted event', 'warning') 503 504 def handle_close(self): 505 self.log_info('unhandled close event', 'warning') 506 self.close() 507 508# --------------------------------------------------------------------------- 509# adds simple buffered output capability, useful for simple clients. 510# [for more sophisticated usage use asynchat.async_chat] 511# --------------------------------------------------------------------------- 512 513class dispatcher_with_send(dispatcher): 514 515 def __init__(self, sock=None, map=None): 516 dispatcher.__init__(self, sock, map) 517 self.out_buffer = b'' 518 519 def initiate_send(self): 520 num_sent = 0 521 num_sent = dispatcher.send(self, self.out_buffer[:65536]) 522 self.out_buffer = self.out_buffer[num_sent:] 523 524 def handle_write(self): 525 self.initiate_send() 526 527 def writable(self): 528 return (not self.connected) or len(self.out_buffer) 529 530 def send(self, data): 531 if self.debug: 532 self.log_info('sending %s' % repr(data)) 533 self.out_buffer = self.out_buffer + data 534 self.initiate_send() 535 536# --------------------------------------------------------------------------- 537# used for debugging. 538# --------------------------------------------------------------------------- 539 540def compact_traceback(): 541 t, v, tb = sys.exc_info() 542 tbinfo = [] 543 if not tb: # Must have a traceback 544 raise AssertionError("traceback does not exist") 545 while tb: 546 tbinfo.append(( 547 tb.tb_frame.f_code.co_filename, 548 tb.tb_frame.f_code.co_name, 549 str(tb.tb_lineno) 550 )) 551 tb = tb.tb_next 552 553 # just to be safe 554 del tb 555 556 file, function, line = tbinfo[-1] 557 info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) 558 return (file, function, line), t, v, info 559 560def close_all(map=None, ignore_all=False): 561 if map is None: 562 map = socket_map 563 for x in list(map.values()): 564 try: 565 x.close() 566 except OSError as x: 567 if x.errno == EBADF: 568 pass 569 elif not ignore_all: 570 raise 571 except _reraised_exceptions: 572 raise 573 except: 574 if not ignore_all: 575 raise 576 map.clear() 577 578# Asynchronous File I/O: 579# 580# After a little research (reading man pages on various unixen, and 581# digging through the linux kernel), I've determined that select() 582# isn't meant for doing asynchronous file i/o. 583# Heartening, though - reading linux/mm/filemap.c shows that linux 584# supports asynchronous read-ahead. So _MOST_ of the time, the data 585# will be sitting in memory for us already when we go to read it. 586# 587# What other OS's (besides NT) support async file i/o? [VMS?] 588# 589# Regardless, this is useful for pipes, and stdin/stdout... 590 591if os.name == 'posix': 592 class file_wrapper: 593 # Here we override just enough to make a file 594 # look like a socket for the purposes of asyncore. 595 # The passed fd is automatically os.dup()'d 596 597 def __init__(self, fd): 598 self.fd = os.dup(fd) 599 600 def __del__(self): 601 if self.fd >= 0: 602 warnings.warn("unclosed file %r" % self, ResourceWarning, 603 source=self) 604 self.close() 605 606 def recv(self, *args): 607 return os.read(self.fd, *args) 608 609 def send(self, *args): 610 return os.write(self.fd, *args) 611 612 def getsockopt(self, level, optname, buflen=None): 613 if (level == socket.SOL_SOCKET and 614 optname == socket.SO_ERROR and 615 not buflen): 616 return 0 617 raise NotImplementedError("Only asyncore specific behaviour " 618 "implemented.") 619 620 read = recv 621 write = send 622 623 def close(self): 624 if self.fd < 0: 625 return 626 fd = self.fd 627 self.fd = -1 628 os.close(fd) 629 630 def fileno(self): 631 return self.fd 632 633 class file_dispatcher(dispatcher): 634 635 def __init__(self, fd, map=None): 636 dispatcher.__init__(self, None, map) 637 self.connected = True 638 try: 639 fd = fd.fileno() 640 except AttributeError: 641 pass 642 self.set_file(fd) 643 # set it to non-blocking mode 644 os.set_blocking(fd, False) 645 646 def set_file(self, fd): 647 self.socket = file_wrapper(fd) 648 self._fileno = self.socket.fileno() 649 self.add_channel() 650