1"""Generic socket server classes. 2 3This module tries to capture the various aspects of defining a server: 4 5For socket-based servers: 6 7- address family: 8 - AF_INET{,6}: IP (Internet Protocol) sockets (default) 9 - AF_UNIX: Unix domain sockets 10 - others, e.g. AF_DECNET are conceivable (see <socket.h> 11- socket type: 12 - SOCK_STREAM (reliable stream, e.g. TCP) 13 - SOCK_DGRAM (datagrams, e.g. UDP) 14 15For request-based servers (including socket-based): 16 17- client address verification before further looking at the request 18 (This is actually a hook for any processing that needs to look 19 at the request before anything else, e.g. logging) 20- how to handle multiple requests: 21 - synchronous (one request is handled at a time) 22 - forking (each request is handled by a new process) 23 - threading (each request is handled by a new thread) 24 25The classes in this module favor the server type that is simplest to 26write: a synchronous TCP/IP server. This is bad class design, but 27saves some typing. (There's also the issue that a deep class hierarchy 28slows down method lookups.) 29 30There are five classes in an inheritance diagram, four of which represent 31synchronous servers of four types: 32 33 +------------+ 34 | BaseServer | 35 +------------+ 36 | 37 v 38 +-----------+ +------------------+ 39 | TCPServer |------->| UnixStreamServer | 40 +-----------+ +------------------+ 41 | 42 v 43 +-----------+ +--------------------+ 44 | UDPServer |------->| UnixDatagramServer | 45 +-----------+ +--------------------+ 46 47Note that UnixDatagramServer derives from UDPServer, not from 48UnixStreamServer -- the only difference between an IP and a Unix 49stream server is the address family, which is simply repeated in both 50unix server classes. 51 52Forking and threading versions of each type of server can be created 53using the ForkingMixIn and ThreadingMixIn mix-in classes. For 54instance, a threading UDP server class is created as follows: 55 56 class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass 57 58The Mix-in class must come first, since it overrides a method defined 59in UDPServer! Setting the various member variables also changes 60the behavior of the underlying server mechanism. 61 62To implement a service, you must derive a class from 63BaseRequestHandler and redefine its handle() method. You can then run 64various versions of the service by combining one of the server classes 65with your request handler class. 66 67The request handler class must be different for datagram or stream 68services. This can be hidden by using the request handler 69subclasses StreamRequestHandler or DatagramRequestHandler. 70 71Of course, you still have to use your head! 72 73For instance, it makes no sense to use a forking server if the service 74contains state in memory that can be modified by requests (since the 75modifications in the child process would never reach the initial state 76kept in the parent process and passed to each child). In this case, 77you can use a threading server, but you will probably have to use 78locks to avoid two requests that come in nearly simultaneous to apply 79conflicting changes to the server state. 80 81On the other hand, if you are building e.g. an HTTP server, where all 82data is stored externally (e.g. in the file system), a synchronous 83class will essentially render the service "deaf" while one request is 84being handled -- which may be for a very long time if a client is slow 85to read all the data it has requested. Here a threading or forking 86server is appropriate. 87 88In some cases, it may be appropriate to process part of a request 89synchronously, but to finish processing in a forked child depending on 90the request data. This can be implemented by using a synchronous 91server and doing an explicit fork in the request handler class 92handle() method. 93 94Another approach to handling multiple simultaneous requests in an 95environment that supports neither threads nor fork (or where these are 96too expensive or inappropriate for the service) is to maintain an 97explicit table of partially finished requests and to use a selector to 98decide which request to work on next (or whether to handle a new 99incoming request). This is particularly important for stream services 100where each client can potentially be connected for a long time (if 101threads or subprocesses cannot be used). 102 103Future work: 104- Standard classes for Sun RPC (which uses either UDP or TCP) 105- Standard mix-in classes to implement various authentication 106 and encryption schemes 107 108XXX Open problems: 109- What to do with out-of-band data? 110 111BaseServer: 112- split generic "request" functionality out into BaseServer class. 113 Copyright (C) 2000 Luke Kenneth Casson Leighton <lkcl@samba.org> 114 115 example: read entries from a SQL database (requires overriding 116 get_request() to return a table entry from the database). 117 entry is processed by a RequestHandlerClass. 118 119""" 120 121# Author of the BaseServer patch: Luke Kenneth Casson Leighton 122 123__version__ = "0.4" 124 125 126import socket 127import selectors 128import os 129import sys 130import threading 131from io import BufferedIOBase 132from time import monotonic as time 133 134__all__ = ["BaseServer", "TCPServer", "UDPServer", 135 "ThreadingUDPServer", "ThreadingTCPServer", 136 "BaseRequestHandler", "StreamRequestHandler", 137 "DatagramRequestHandler", "ThreadingMixIn"] 138if hasattr(os, "fork"): 139 __all__.extend(["ForkingUDPServer","ForkingTCPServer", "ForkingMixIn"]) 140if hasattr(socket, "AF_UNIX"): 141 __all__.extend(["UnixStreamServer","UnixDatagramServer", 142 "ThreadingUnixStreamServer", 143 "ThreadingUnixDatagramServer"]) 144 if hasattr(os, "fork"): 145 __all__.extend(["ForkingUnixStreamServer", "ForkingUnixDatagramServer"]) 146 147# poll/select have the advantage of not requiring any extra file descriptor, 148# contrarily to epoll/kqueue (also, they require a single syscall). 149if hasattr(selectors, 'PollSelector'): 150 _ServerSelector = selectors.PollSelector 151else: 152 _ServerSelector = selectors.SelectSelector 153 154 155class BaseServer: 156 157 """Base class for server classes. 158 159 Methods for the caller: 160 161 - __init__(server_address, RequestHandlerClass) 162 - serve_forever(poll_interval=0.5) 163 - shutdown() 164 - handle_request() # if you do not use serve_forever() 165 - fileno() -> int # for selector 166 167 Methods that may be overridden: 168 169 - server_bind() 170 - server_activate() 171 - get_request() -> request, client_address 172 - handle_timeout() 173 - verify_request(request, client_address) 174 - server_close() 175 - process_request(request, client_address) 176 - shutdown_request(request) 177 - close_request(request) 178 - service_actions() 179 - handle_error() 180 181 Methods for derived classes: 182 183 - finish_request(request, client_address) 184 185 Class variables that may be overridden by derived classes or 186 instances: 187 188 - timeout 189 - address_family 190 - socket_type 191 - allow_reuse_address 192 - allow_reuse_port 193 194 Instance variables: 195 196 - RequestHandlerClass 197 - socket 198 199 """ 200 201 timeout = None 202 203 def __init__(self, server_address, RequestHandlerClass): 204 """Constructor. May be extended, do not override.""" 205 self.server_address = server_address 206 self.RequestHandlerClass = RequestHandlerClass 207 self.__is_shut_down = threading.Event() 208 self.__shutdown_request = False 209 210 def server_activate(self): 211 """Called by constructor to activate the server. 212 213 May be overridden. 214 215 """ 216 pass 217 218 def serve_forever(self, poll_interval=0.5): 219 """Handle one request at a time until shutdown. 220 221 Polls for shutdown every poll_interval seconds. Ignores 222 self.timeout. If you need to do periodic tasks, do them in 223 another thread. 224 """ 225 self.__is_shut_down.clear() 226 try: 227 # XXX: Consider using another file descriptor or connecting to the 228 # socket to wake this up instead of polling. Polling reduces our 229 # responsiveness to a shutdown request and wastes cpu at all other 230 # times. 231 with _ServerSelector() as selector: 232 selector.register(self, selectors.EVENT_READ) 233 234 while not self.__shutdown_request: 235 ready = selector.select(poll_interval) 236 # bpo-35017: shutdown() called during select(), exit immediately. 237 if self.__shutdown_request: 238 break 239 if ready: 240 self._handle_request_noblock() 241 242 self.service_actions() 243 finally: 244 self.__shutdown_request = False 245 self.__is_shut_down.set() 246 247 def shutdown(self): 248 """Stops the serve_forever loop. 249 250 Blocks until the loop has finished. This must be called while 251 serve_forever() is running in another thread, or it will 252 deadlock. 253 """ 254 self.__shutdown_request = True 255 self.__is_shut_down.wait() 256 257 def service_actions(self): 258 """Called by the serve_forever() loop. 259 260 May be overridden by a subclass / Mixin to implement any code that 261 needs to be run during the loop. 262 """ 263 pass 264 265 # The distinction between handling, getting, processing and finishing a 266 # request is fairly arbitrary. Remember: 267 # 268 # - handle_request() is the top-level call. It calls selector.select(), 269 # get_request(), verify_request() and process_request() 270 # - get_request() is different for stream or datagram sockets 271 # - process_request() is the place that may fork a new process or create a 272 # new thread to finish the request 273 # - finish_request() instantiates the request handler class; this 274 # constructor will handle the request all by itself 275 276 def handle_request(self): 277 """Handle one request, possibly blocking. 278 279 Respects self.timeout. 280 """ 281 # Support people who used socket.settimeout() to escape 282 # handle_request before self.timeout was available. 283 timeout = self.socket.gettimeout() 284 if timeout is None: 285 timeout = self.timeout 286 elif self.timeout is not None: 287 timeout = min(timeout, self.timeout) 288 if timeout is not None: 289 deadline = time() + timeout 290 291 # Wait until a request arrives or the timeout expires - the loop is 292 # necessary to accommodate early wakeups due to EINTR. 293 with _ServerSelector() as selector: 294 selector.register(self, selectors.EVENT_READ) 295 296 while True: 297 if selector.select(timeout): 298 return self._handle_request_noblock() 299 else: 300 if timeout is not None: 301 timeout = deadline - time() 302 if timeout < 0: 303 return self.handle_timeout() 304 305 def _handle_request_noblock(self): 306 """Handle one request, without blocking. 307 308 I assume that selector.select() has returned that the socket is 309 readable before this function was called, so there should be no risk of 310 blocking in get_request(). 311 """ 312 try: 313 request, client_address = self.get_request() 314 except OSError: 315 return 316 if self.verify_request(request, client_address): 317 try: 318 self.process_request(request, client_address) 319 except Exception: 320 self.handle_error(request, client_address) 321 self.shutdown_request(request) 322 except: 323 self.shutdown_request(request) 324 raise 325 else: 326 self.shutdown_request(request) 327 328 def handle_timeout(self): 329 """Called if no new request arrives within self.timeout. 330 331 Overridden by ForkingMixIn. 332 """ 333 pass 334 335 def verify_request(self, request, client_address): 336 """Verify the request. May be overridden. 337 338 Return True if we should proceed with this request. 339 340 """ 341 return True 342 343 def process_request(self, request, client_address): 344 """Call finish_request. 345 346 Overridden by ForkingMixIn and ThreadingMixIn. 347 348 """ 349 self.finish_request(request, client_address) 350 self.shutdown_request(request) 351 352 def server_close(self): 353 """Called to clean-up the server. 354 355 May be overridden. 356 357 """ 358 pass 359 360 def finish_request(self, request, client_address): 361 """Finish one request by instantiating RequestHandlerClass.""" 362 self.RequestHandlerClass(request, client_address, self) 363 364 def shutdown_request(self, request): 365 """Called to shutdown and close an individual request.""" 366 self.close_request(request) 367 368 def close_request(self, request): 369 """Called to clean up an individual request.""" 370 pass 371 372 def handle_error(self, request, client_address): 373 """Handle an error gracefully. May be overridden. 374 375 The default is to print a traceback and continue. 376 377 """ 378 print('-'*40, file=sys.stderr) 379 print('Exception occurred during processing of request from', 380 client_address, file=sys.stderr) 381 import traceback 382 traceback.print_exc() 383 print('-'*40, file=sys.stderr) 384 385 def __enter__(self): 386 return self 387 388 def __exit__(self, *args): 389 self.server_close() 390 391 392class TCPServer(BaseServer): 393 394 """Base class for various socket-based server classes. 395 396 Defaults to synchronous IP stream (i.e., TCP). 397 398 Methods for the caller: 399 400 - __init__(server_address, RequestHandlerClass, bind_and_activate=True) 401 - serve_forever(poll_interval=0.5) 402 - shutdown() 403 - handle_request() # if you don't use serve_forever() 404 - fileno() -> int # for selector 405 406 Methods that may be overridden: 407 408 - server_bind() 409 - server_activate() 410 - get_request() -> request, client_address 411 - handle_timeout() 412 - verify_request(request, client_address) 413 - process_request(request, client_address) 414 - shutdown_request(request) 415 - close_request(request) 416 - handle_error() 417 418 Methods for derived classes: 419 420 - finish_request(request, client_address) 421 422 Class variables that may be overridden by derived classes or 423 instances: 424 425 - timeout 426 - address_family 427 - socket_type 428 - request_queue_size (only for stream sockets) 429 - allow_reuse_address 430 - allow_reuse_port 431 432 Instance variables: 433 434 - server_address 435 - RequestHandlerClass 436 - socket 437 438 """ 439 440 address_family = socket.AF_INET 441 442 socket_type = socket.SOCK_STREAM 443 444 request_queue_size = 5 445 446 allow_reuse_address = False 447 448 allow_reuse_port = False 449 450 def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): 451 """Constructor. May be extended, do not override.""" 452 BaseServer.__init__(self, server_address, RequestHandlerClass) 453 self.socket = socket.socket(self.address_family, 454 self.socket_type) 455 if bind_and_activate: 456 try: 457 self.server_bind() 458 self.server_activate() 459 except: 460 self.server_close() 461 raise 462 463 def server_bind(self): 464 """Called by constructor to bind the socket. 465 466 May be overridden. 467 468 """ 469 if self.allow_reuse_address and hasattr(socket, "SO_REUSEADDR"): 470 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 471 if self.allow_reuse_port and hasattr(socket, "SO_REUSEPORT"): 472 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 473 self.socket.bind(self.server_address) 474 self.server_address = self.socket.getsockname() 475 476 def server_activate(self): 477 """Called by constructor to activate the server. 478 479 May be overridden. 480 481 """ 482 self.socket.listen(self.request_queue_size) 483 484 def server_close(self): 485 """Called to clean-up the server. 486 487 May be overridden. 488 489 """ 490 self.socket.close() 491 492 def fileno(self): 493 """Return socket file number. 494 495 Interface required by selector. 496 497 """ 498 return self.socket.fileno() 499 500 def get_request(self): 501 """Get the request and client address from the socket. 502 503 May be overridden. 504 505 """ 506 return self.socket.accept() 507 508 def shutdown_request(self, request): 509 """Called to shutdown and close an individual request.""" 510 try: 511 #explicitly shutdown. socket.close() merely releases 512 #the socket and waits for GC to perform the actual close. 513 request.shutdown(socket.SHUT_WR) 514 except OSError: 515 pass #some platforms may raise ENOTCONN here 516 self.close_request(request) 517 518 def close_request(self, request): 519 """Called to clean up an individual request.""" 520 request.close() 521 522 523class UDPServer(TCPServer): 524 525 """UDP server class.""" 526 527 allow_reuse_address = False 528 529 allow_reuse_port = False 530 531 socket_type = socket.SOCK_DGRAM 532 533 max_packet_size = 8192 534 535 def get_request(self): 536 data, client_addr = self.socket.recvfrom(self.max_packet_size) 537 return (data, self.socket), client_addr 538 539 def server_activate(self): 540 # No need to call listen() for UDP. 541 pass 542 543 def shutdown_request(self, request): 544 # No need to shutdown anything. 545 self.close_request(request) 546 547 def close_request(self, request): 548 # No need to close anything. 549 pass 550 551if hasattr(os, "fork"): 552 class ForkingMixIn: 553 """Mix-in class to handle each request in a new process.""" 554 555 timeout = 300 556 active_children = None 557 max_children = 40 558 # If true, server_close() waits until all child processes complete. 559 block_on_close = True 560 561 def collect_children(self, *, blocking=False): 562 """Internal routine to wait for children that have exited.""" 563 if self.active_children is None: 564 return 565 566 # If we're above the max number of children, wait and reap them until 567 # we go back below threshold. Note that we use waitpid(-1) below to be 568 # able to collect children in size(<defunct children>) syscalls instead 569 # of size(<children>): the downside is that this might reap children 570 # which we didn't spawn, which is why we only resort to this when we're 571 # above max_children. 572 while len(self.active_children) >= self.max_children: 573 try: 574 pid, _ = os.waitpid(-1, 0) 575 self.active_children.discard(pid) 576 except ChildProcessError: 577 # we don't have any children, we're done 578 self.active_children.clear() 579 except OSError: 580 break 581 582 # Now reap all defunct children. 583 for pid in self.active_children.copy(): 584 try: 585 flags = 0 if blocking else os.WNOHANG 586 pid, _ = os.waitpid(pid, flags) 587 # if the child hasn't exited yet, pid will be 0 and ignored by 588 # discard() below 589 self.active_children.discard(pid) 590 except ChildProcessError: 591 # someone else reaped it 592 self.active_children.discard(pid) 593 except OSError: 594 pass 595 596 def handle_timeout(self): 597 """Wait for zombies after self.timeout seconds of inactivity. 598 599 May be extended, do not override. 600 """ 601 self.collect_children() 602 603 def service_actions(self): 604 """Collect the zombie child processes regularly in the ForkingMixIn. 605 606 service_actions is called in the BaseServer's serve_forever loop. 607 """ 608 self.collect_children() 609 610 def process_request(self, request, client_address): 611 """Fork a new subprocess to process the request.""" 612 pid = os.fork() 613 if pid: 614 # Parent process 615 if self.active_children is None: 616 self.active_children = set() 617 self.active_children.add(pid) 618 self.close_request(request) 619 return 620 else: 621 # Child process. 622 # This must never return, hence os._exit()! 623 status = 1 624 try: 625 self.finish_request(request, client_address) 626 status = 0 627 except Exception: 628 self.handle_error(request, client_address) 629 finally: 630 try: 631 self.shutdown_request(request) 632 finally: 633 os._exit(status) 634 635 def server_close(self): 636 super().server_close() 637 self.collect_children(blocking=self.block_on_close) 638 639 640class _Threads(list): 641 """ 642 Joinable list of all non-daemon threads. 643 """ 644 def append(self, thread): 645 self.reap() 646 if thread.daemon: 647 return 648 super().append(thread) 649 650 def pop_all(self): 651 self[:], result = [], self[:] 652 return result 653 654 def join(self): 655 for thread in self.pop_all(): 656 thread.join() 657 658 def reap(self): 659 self[:] = (thread for thread in self if thread.is_alive()) 660 661 662class _NoThreads: 663 """ 664 Degenerate version of _Threads. 665 """ 666 def append(self, thread): 667 pass 668 669 def join(self): 670 pass 671 672 673class ThreadingMixIn: 674 """Mix-in class to handle each request in a new thread.""" 675 676 # Decides how threads will act upon termination of the 677 # main process 678 daemon_threads = False 679 # If true, server_close() waits until all non-daemonic threads terminate. 680 block_on_close = True 681 # Threads object 682 # used by server_close() to wait for all threads completion. 683 _threads = _NoThreads() 684 685 def process_request_thread(self, request, client_address): 686 """Same as in BaseServer but as a thread. 687 688 In addition, exception handling is done here. 689 690 """ 691 try: 692 self.finish_request(request, client_address) 693 except Exception: 694 self.handle_error(request, client_address) 695 finally: 696 self.shutdown_request(request) 697 698 def process_request(self, request, client_address): 699 """Start a new thread to process the request.""" 700 if self.block_on_close: 701 vars(self).setdefault('_threads', _Threads()) 702 t = threading.Thread(target = self.process_request_thread, 703 args = (request, client_address)) 704 t.daemon = self.daemon_threads 705 self._threads.append(t) 706 t.start() 707 708 def server_close(self): 709 super().server_close() 710 self._threads.join() 711 712 713if hasattr(os, "fork"): 714 class ForkingUDPServer(ForkingMixIn, UDPServer): pass 715 class ForkingTCPServer(ForkingMixIn, TCPServer): pass 716 717class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass 718class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass 719 720if hasattr(socket, 'AF_UNIX'): 721 722 class UnixStreamServer(TCPServer): 723 address_family = socket.AF_UNIX 724 725 class UnixDatagramServer(UDPServer): 726 address_family = socket.AF_UNIX 727 728 class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass 729 730 class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass 731 732 if hasattr(os, "fork"): 733 class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass 734 735 class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass 736 737class BaseRequestHandler: 738 739 """Base class for request handler classes. 740 741 This class is instantiated for each request to be handled. The 742 constructor sets the instance variables request, client_address 743 and server, and then calls the handle() method. To implement a 744 specific service, all you need to do is to derive a class which 745 defines a handle() method. 746 747 The handle() method can find the request as self.request, the 748 client address as self.client_address, and the server (in case it 749 needs access to per-server information) as self.server. Since a 750 separate instance is created for each request, the handle() method 751 can define other arbitrary instance variables. 752 753 """ 754 755 def __init__(self, request, client_address, server): 756 self.request = request 757 self.client_address = client_address 758 self.server = server 759 self.setup() 760 try: 761 self.handle() 762 finally: 763 self.finish() 764 765 def setup(self): 766 pass 767 768 def handle(self): 769 pass 770 771 def finish(self): 772 pass 773 774 775# The following two classes make it possible to use the same service 776# class for stream or datagram servers. 777# Each class sets up these instance variables: 778# - rfile: a file object from which receives the request is read 779# - wfile: a file object to which the reply is written 780# When the handle() method returns, wfile is flushed properly 781 782 783class StreamRequestHandler(BaseRequestHandler): 784 785 """Define self.rfile and self.wfile for stream sockets.""" 786 787 # Default buffer sizes for rfile, wfile. 788 # We default rfile to buffered because otherwise it could be 789 # really slow for large data (a getc() call per byte); we make 790 # wfile unbuffered because (a) often after a write() we want to 791 # read and we need to flush the line; (b) big writes to unbuffered 792 # files are typically optimized by stdio even when big reads 793 # aren't. 794 rbufsize = -1 795 wbufsize = 0 796 797 # A timeout to apply to the request socket, if not None. 798 timeout = None 799 800 # Disable nagle algorithm for this socket, if True. 801 # Use only when wbufsize != 0, to avoid small packets. 802 disable_nagle_algorithm = False 803 804 def setup(self): 805 self.connection = self.request 806 if self.timeout is not None: 807 self.connection.settimeout(self.timeout) 808 if self.disable_nagle_algorithm: 809 self.connection.setsockopt(socket.IPPROTO_TCP, 810 socket.TCP_NODELAY, True) 811 self.rfile = self.connection.makefile('rb', self.rbufsize) 812 if self.wbufsize == 0: 813 self.wfile = _SocketWriter(self.connection) 814 else: 815 self.wfile = self.connection.makefile('wb', self.wbufsize) 816 817 def finish(self): 818 if not self.wfile.closed: 819 try: 820 self.wfile.flush() 821 except socket.error: 822 # A final socket error may have occurred here, such as 823 # the local error ECONNABORTED. 824 pass 825 self.wfile.close() 826 self.rfile.close() 827 828class _SocketWriter(BufferedIOBase): 829 """Simple writable BufferedIOBase implementation for a socket 830 831 Does not hold data in a buffer, avoiding any need to call flush().""" 832 833 def __init__(self, sock): 834 self._sock = sock 835 836 def writable(self): 837 return True 838 839 def write(self, b): 840 self._sock.sendall(b) 841 with memoryview(b) as view: 842 return view.nbytes 843 844 def fileno(self): 845 return self._sock.fileno() 846 847class DatagramRequestHandler(BaseRequestHandler): 848 849 """Define self.rfile and self.wfile for datagram sockets.""" 850 851 def setup(self): 852 from io import BytesIO 853 self.packet, self.socket = self.request 854 self.rfile = BytesIO(self.packet) 855 self.wfile = BytesIO() 856 857 def finish(self): 858 self.socket.sendto(self.wfile.getvalue(), self.client_address) 859