• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Generic socket server classes.
2
3This module tries to capture the various aspects of defining a server:
4
5For socket-based servers:
6
7- address family:
8        - AF_INET{,6}: IP (Internet Protocol) sockets (default)
9        - AF_UNIX: Unix domain sockets
10        - others, e.g. AF_DECNET are conceivable (see <socket.h>
11- socket type:
12        - SOCK_STREAM (reliable stream, e.g. TCP)
13        - SOCK_DGRAM (datagrams, e.g. UDP)
14
15For request-based servers (including socket-based):
16
17- client address verification before further looking at the request
18        (This is actually a hook for any processing that needs to look
19         at the request before anything else, e.g. logging)
20- how to handle multiple requests:
21        - synchronous (one request is handled at a time)
22        - forking (each request is handled by a new process)
23        - threading (each request is handled by a new thread)
24
25The classes in this module favor the server type that is simplest to
26write: a synchronous TCP/IP server.  This is bad class design, but
27saves some typing.  (There's also the issue that a deep class hierarchy
28slows down method lookups.)
29
30There are five classes in an inheritance diagram, four of which represent
31synchronous servers of four types:
32
33        +------------+
34        | BaseServer |
35        +------------+
36              |
37              v
38        +-----------+        +------------------+
39        | TCPServer |------->| UnixStreamServer |
40        +-----------+        +------------------+
41              |
42              v
43        +-----------+        +--------------------+
44        | UDPServer |------->| UnixDatagramServer |
45        +-----------+        +--------------------+
46
47Note that UnixDatagramServer derives from UDPServer, not from
48UnixStreamServer -- the only difference between an IP and a Unix
49stream server is the address family, which is simply repeated in both
50unix server classes.
51
52Forking and threading versions of each type of server can be created
53using the ForkingMixIn and ThreadingMixIn mix-in classes.  For
54instance, a threading UDP server class is created as follows:
55
56        class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
57
58The Mix-in class must come first, since it overrides a method defined
59in UDPServer! Setting the various member variables also changes
60the behavior of the underlying server mechanism.
61
62To implement a service, you must derive a class from
63BaseRequestHandler and redefine its handle() method.  You can then run
64various versions of the service by combining one of the server classes
65with your request handler class.
66
67The request handler class must be different for datagram or stream
68services.  This can be hidden by using the request handler
69subclasses StreamRequestHandler or DatagramRequestHandler.
70
71Of course, you still have to use your head!
72
73For instance, it makes no sense to use a forking server if the service
74contains state in memory that can be modified by requests (since the
75modifications in the child process would never reach the initial state
76kept in the parent process and passed to each child).  In this case,
77you can use a threading server, but you will probably have to use
78locks to avoid two requests that come in nearly simultaneous to apply
79conflicting changes to the server state.
80
81On the other hand, if you are building e.g. an HTTP server, where all
82data is stored externally (e.g. in the file system), a synchronous
83class will essentially render the service "deaf" while one request is
84being handled -- which may be for a very long time if a client is slow
85to read all the data it has requested.  Here a threading or forking
86server is appropriate.
87
88In some cases, it may be appropriate to process part of a request
89synchronously, but to finish processing in a forked child depending on
90the request data.  This can be implemented by using a synchronous
91server and doing an explicit fork in the request handler class
92handle() method.
93
94Another approach to handling multiple simultaneous requests in an
95environment that supports neither threads nor fork (or where these are
96too expensive or inappropriate for the service) is to maintain an
97explicit table of partially finished requests and to use a selector to
98decide which request to work on next (or whether to handle a new
99incoming request).  This is particularly important for stream services
100where each client can potentially be connected for a long time (if
101threads or subprocesses cannot be used).
102
103Future work:
104- Standard classes for Sun RPC (which uses either UDP or TCP)
105- Standard mix-in classes to implement various authentication
106  and encryption schemes
107
108XXX Open problems:
109- What to do with out-of-band data?
110
111BaseServer:
112- split generic "request" functionality out into BaseServer class.
113  Copyright (C) 2000  Luke Kenneth Casson Leighton <lkcl@samba.org>
114
115  example: read entries from a SQL database (requires overriding
116  get_request() to return a table entry from the database).
117  entry is processed by a RequestHandlerClass.
118
119"""
120
121# Author of the BaseServer patch: Luke Kenneth Casson Leighton
122
123__version__ = "0.4"
124
125
126import socket
127import selectors
128import os
129import sys
130import threading
131from io import BufferedIOBase
132from time import monotonic as time
133
134__all__ = ["BaseServer", "TCPServer", "UDPServer",
135           "ThreadingUDPServer", "ThreadingTCPServer",
136           "BaseRequestHandler", "StreamRequestHandler",
137           "DatagramRequestHandler", "ThreadingMixIn"]
138if hasattr(os, "fork"):
139    __all__.extend(["ForkingUDPServer","ForkingTCPServer", "ForkingMixIn"])
140if hasattr(socket, "AF_UNIX"):
141    __all__.extend(["UnixStreamServer","UnixDatagramServer",
142                    "ThreadingUnixStreamServer",
143                    "ThreadingUnixDatagramServer"])
144    if hasattr(os, "fork"):
145        __all__.extend(["ForkingUnixStreamServer", "ForkingUnixDatagramServer"])
146
147# poll/select have the advantage of not requiring any extra file descriptor,
148# contrarily to epoll/kqueue (also, they require a single syscall).
149if hasattr(selectors, 'PollSelector'):
150    _ServerSelector = selectors.PollSelector
151else:
152    _ServerSelector = selectors.SelectSelector
153
154
155class BaseServer:
156
157    """Base class for server classes.
158
159    Methods for the caller:
160
161    - __init__(server_address, RequestHandlerClass)
162    - serve_forever(poll_interval=0.5)
163    - shutdown()
164    - handle_request()  # if you do not use serve_forever()
165    - fileno() -> int   # for selector
166
167    Methods that may be overridden:
168
169    - server_bind()
170    - server_activate()
171    - get_request() -> request, client_address
172    - handle_timeout()
173    - verify_request(request, client_address)
174    - server_close()
175    - process_request(request, client_address)
176    - shutdown_request(request)
177    - close_request(request)
178    - service_actions()
179    - handle_error()
180
181    Methods for derived classes:
182
183    - finish_request(request, client_address)
184
185    Class variables that may be overridden by derived classes or
186    instances:
187
188    - timeout
189    - address_family
190    - socket_type
191    - allow_reuse_address
192    - allow_reuse_port
193
194    Instance variables:
195
196    - RequestHandlerClass
197    - socket
198
199    """
200
201    timeout = None
202
203    def __init__(self, server_address, RequestHandlerClass):
204        """Constructor.  May be extended, do not override."""
205        self.server_address = server_address
206        self.RequestHandlerClass = RequestHandlerClass
207        self.__is_shut_down = threading.Event()
208        self.__shutdown_request = False
209
210    def server_activate(self):
211        """Called by constructor to activate the server.
212
213        May be overridden.
214
215        """
216        pass
217
218    def serve_forever(self, poll_interval=0.5):
219        """Handle one request at a time until shutdown.
220
221        Polls for shutdown every poll_interval seconds. Ignores
222        self.timeout. If you need to do periodic tasks, do them in
223        another thread.
224        """
225        self.__is_shut_down.clear()
226        try:
227            # XXX: Consider using another file descriptor or connecting to the
228            # socket to wake this up instead of polling. Polling reduces our
229            # responsiveness to a shutdown request and wastes cpu at all other
230            # times.
231            with _ServerSelector() as selector:
232                selector.register(self, selectors.EVENT_READ)
233
234                while not self.__shutdown_request:
235                    ready = selector.select(poll_interval)
236                    # bpo-35017: shutdown() called during select(), exit immediately.
237                    if self.__shutdown_request:
238                        break
239                    if ready:
240                        self._handle_request_noblock()
241
242                    self.service_actions()
243        finally:
244            self.__shutdown_request = False
245            self.__is_shut_down.set()
246
247    def shutdown(self):
248        """Stops the serve_forever loop.
249
250        Blocks until the loop has finished. This must be called while
251        serve_forever() is running in another thread, or it will
252        deadlock.
253        """
254        self.__shutdown_request = True
255        self.__is_shut_down.wait()
256
257    def service_actions(self):
258        """Called by the serve_forever() loop.
259
260        May be overridden by a subclass / Mixin to implement any code that
261        needs to be run during the loop.
262        """
263        pass
264
265    # The distinction between handling, getting, processing and finishing a
266    # request is fairly arbitrary.  Remember:
267    #
268    # - handle_request() is the top-level call.  It calls selector.select(),
269    #   get_request(), verify_request() and process_request()
270    # - get_request() is different for stream or datagram sockets
271    # - process_request() is the place that may fork a new process or create a
272    #   new thread to finish the request
273    # - finish_request() instantiates the request handler class; this
274    #   constructor will handle the request all by itself
275
276    def handle_request(self):
277        """Handle one request, possibly blocking.
278
279        Respects self.timeout.
280        """
281        # Support people who used socket.settimeout() to escape
282        # handle_request before self.timeout was available.
283        timeout = self.socket.gettimeout()
284        if timeout is None:
285            timeout = self.timeout
286        elif self.timeout is not None:
287            timeout = min(timeout, self.timeout)
288        if timeout is not None:
289            deadline = time() + timeout
290
291        # Wait until a request arrives or the timeout expires - the loop is
292        # necessary to accommodate early wakeups due to EINTR.
293        with _ServerSelector() as selector:
294            selector.register(self, selectors.EVENT_READ)
295
296            while True:
297                if selector.select(timeout):
298                    return self._handle_request_noblock()
299                else:
300                    if timeout is not None:
301                        timeout = deadline - time()
302                        if timeout < 0:
303                            return self.handle_timeout()
304
305    def _handle_request_noblock(self):
306        """Handle one request, without blocking.
307
308        I assume that selector.select() has returned that the socket is
309        readable before this function was called, so there should be no risk of
310        blocking in get_request().
311        """
312        try:
313            request, client_address = self.get_request()
314        except OSError:
315            return
316        if self.verify_request(request, client_address):
317            try:
318                self.process_request(request, client_address)
319            except Exception:
320                self.handle_error(request, client_address)
321                self.shutdown_request(request)
322            except:
323                self.shutdown_request(request)
324                raise
325        else:
326            self.shutdown_request(request)
327
328    def handle_timeout(self):
329        """Called if no new request arrives within self.timeout.
330
331        Overridden by ForkingMixIn.
332        """
333        pass
334
335    def verify_request(self, request, client_address):
336        """Verify the request.  May be overridden.
337
338        Return True if we should proceed with this request.
339
340        """
341        return True
342
343    def process_request(self, request, client_address):
344        """Call finish_request.
345
346        Overridden by ForkingMixIn and ThreadingMixIn.
347
348        """
349        self.finish_request(request, client_address)
350        self.shutdown_request(request)
351
352    def server_close(self):
353        """Called to clean-up the server.
354
355        May be overridden.
356
357        """
358        pass
359
360    def finish_request(self, request, client_address):
361        """Finish one request by instantiating RequestHandlerClass."""
362        self.RequestHandlerClass(request, client_address, self)
363
364    def shutdown_request(self, request):
365        """Called to shutdown and close an individual request."""
366        self.close_request(request)
367
368    def close_request(self, request):
369        """Called to clean up an individual request."""
370        pass
371
372    def handle_error(self, request, client_address):
373        """Handle an error gracefully.  May be overridden.
374
375        The default is to print a traceback and continue.
376
377        """
378        print('-'*40, file=sys.stderr)
379        print('Exception occurred during processing of request from',
380            client_address, file=sys.stderr)
381        import traceback
382        traceback.print_exc()
383        print('-'*40, file=sys.stderr)
384
385    def __enter__(self):
386        return self
387
388    def __exit__(self, *args):
389        self.server_close()
390
391
392class TCPServer(BaseServer):
393
394    """Base class for various socket-based server classes.
395
396    Defaults to synchronous IP stream (i.e., TCP).
397
398    Methods for the caller:
399
400    - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
401    - serve_forever(poll_interval=0.5)
402    - shutdown()
403    - handle_request()  # if you don't use serve_forever()
404    - fileno() -> int   # for selector
405
406    Methods that may be overridden:
407
408    - server_bind()
409    - server_activate()
410    - get_request() -> request, client_address
411    - handle_timeout()
412    - verify_request(request, client_address)
413    - process_request(request, client_address)
414    - shutdown_request(request)
415    - close_request(request)
416    - handle_error()
417
418    Methods for derived classes:
419
420    - finish_request(request, client_address)
421
422    Class variables that may be overridden by derived classes or
423    instances:
424
425    - timeout
426    - address_family
427    - socket_type
428    - request_queue_size (only for stream sockets)
429    - allow_reuse_address
430    - allow_reuse_port
431
432    Instance variables:
433
434    - server_address
435    - RequestHandlerClass
436    - socket
437
438    """
439
440    address_family = socket.AF_INET
441
442    socket_type = socket.SOCK_STREAM
443
444    request_queue_size = 5
445
446    allow_reuse_address = False
447
448    allow_reuse_port = False
449
450    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
451        """Constructor.  May be extended, do not override."""
452        BaseServer.__init__(self, server_address, RequestHandlerClass)
453        self.socket = socket.socket(self.address_family,
454                                    self.socket_type)
455        if bind_and_activate:
456            try:
457                self.server_bind()
458                self.server_activate()
459            except:
460                self.server_close()
461                raise
462
463    def server_bind(self):
464        """Called by constructor to bind the socket.
465
466        May be overridden.
467
468        """
469        if self.allow_reuse_address and hasattr(socket, "SO_REUSEADDR"):
470            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
471        if self.allow_reuse_port and hasattr(socket, "SO_REUSEPORT"):
472            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
473        self.socket.bind(self.server_address)
474        self.server_address = self.socket.getsockname()
475
476    def server_activate(self):
477        """Called by constructor to activate the server.
478
479        May be overridden.
480
481        """
482        self.socket.listen(self.request_queue_size)
483
484    def server_close(self):
485        """Called to clean-up the server.
486
487        May be overridden.
488
489        """
490        self.socket.close()
491
492    def fileno(self):
493        """Return socket file number.
494
495        Interface required by selector.
496
497        """
498        return self.socket.fileno()
499
500    def get_request(self):
501        """Get the request and client address from the socket.
502
503        May be overridden.
504
505        """
506        return self.socket.accept()
507
508    def shutdown_request(self, request):
509        """Called to shutdown and close an individual request."""
510        try:
511            #explicitly shutdown.  socket.close() merely releases
512            #the socket and waits for GC to perform the actual close.
513            request.shutdown(socket.SHUT_WR)
514        except OSError:
515            pass #some platforms may raise ENOTCONN here
516        self.close_request(request)
517
518    def close_request(self, request):
519        """Called to clean up an individual request."""
520        request.close()
521
522
523class UDPServer(TCPServer):
524
525    """UDP server class."""
526
527    allow_reuse_address = False
528
529    allow_reuse_port = False
530
531    socket_type = socket.SOCK_DGRAM
532
533    max_packet_size = 8192
534
535    def get_request(self):
536        data, client_addr = self.socket.recvfrom(self.max_packet_size)
537        return (data, self.socket), client_addr
538
539    def server_activate(self):
540        # No need to call listen() for UDP.
541        pass
542
543    def shutdown_request(self, request):
544        # No need to shutdown anything.
545        self.close_request(request)
546
547    def close_request(self, request):
548        # No need to close anything.
549        pass
550
551if hasattr(os, "fork"):
552    class ForkingMixIn:
553        """Mix-in class to handle each request in a new process."""
554
555        timeout = 300
556        active_children = None
557        max_children = 40
558        # If true, server_close() waits until all child processes complete.
559        block_on_close = True
560
561        def collect_children(self, *, blocking=False):
562            """Internal routine to wait for children that have exited."""
563            if self.active_children is None:
564                return
565
566            # If we're above the max number of children, wait and reap them until
567            # we go back below threshold. Note that we use waitpid(-1) below to be
568            # able to collect children in size(<defunct children>) syscalls instead
569            # of size(<children>): the downside is that this might reap children
570            # which we didn't spawn, which is why we only resort to this when we're
571            # above max_children.
572            while len(self.active_children) >= self.max_children:
573                try:
574                    pid, _ = os.waitpid(-1, 0)
575                    self.active_children.discard(pid)
576                except ChildProcessError:
577                    # we don't have any children, we're done
578                    self.active_children.clear()
579                except OSError:
580                    break
581
582            # Now reap all defunct children.
583            for pid in self.active_children.copy():
584                try:
585                    flags = 0 if blocking else os.WNOHANG
586                    pid, _ = os.waitpid(pid, flags)
587                    # if the child hasn't exited yet, pid will be 0 and ignored by
588                    # discard() below
589                    self.active_children.discard(pid)
590                except ChildProcessError:
591                    # someone else reaped it
592                    self.active_children.discard(pid)
593                except OSError:
594                    pass
595
596        def handle_timeout(self):
597            """Wait for zombies after self.timeout seconds of inactivity.
598
599            May be extended, do not override.
600            """
601            self.collect_children()
602
603        def service_actions(self):
604            """Collect the zombie child processes regularly in the ForkingMixIn.
605
606            service_actions is called in the BaseServer's serve_forever loop.
607            """
608            self.collect_children()
609
610        def process_request(self, request, client_address):
611            """Fork a new subprocess to process the request."""
612            pid = os.fork()
613            if pid:
614                # Parent process
615                if self.active_children is None:
616                    self.active_children = set()
617                self.active_children.add(pid)
618                self.close_request(request)
619                return
620            else:
621                # Child process.
622                # This must never return, hence os._exit()!
623                status = 1
624                try:
625                    self.finish_request(request, client_address)
626                    status = 0
627                except Exception:
628                    self.handle_error(request, client_address)
629                finally:
630                    try:
631                        self.shutdown_request(request)
632                    finally:
633                        os._exit(status)
634
635        def server_close(self):
636            super().server_close()
637            self.collect_children(blocking=self.block_on_close)
638
639
640class _Threads(list):
641    """
642    Joinable list of all non-daemon threads.
643    """
644    def append(self, thread):
645        self.reap()
646        if thread.daemon:
647            return
648        super().append(thread)
649
650    def pop_all(self):
651        self[:], result = [], self[:]
652        return result
653
654    def join(self):
655        for thread in self.pop_all():
656            thread.join()
657
658    def reap(self):
659        self[:] = (thread for thread in self if thread.is_alive())
660
661
662class _NoThreads:
663    """
664    Degenerate version of _Threads.
665    """
666    def append(self, thread):
667        pass
668
669    def join(self):
670        pass
671
672
673class ThreadingMixIn:
674    """Mix-in class to handle each request in a new thread."""
675
676    # Decides how threads will act upon termination of the
677    # main process
678    daemon_threads = False
679    # If true, server_close() waits until all non-daemonic threads terminate.
680    block_on_close = True
681    # Threads object
682    # used by server_close() to wait for all threads completion.
683    _threads = _NoThreads()
684
685    def process_request_thread(self, request, client_address):
686        """Same as in BaseServer but as a thread.
687
688        In addition, exception handling is done here.
689
690        """
691        try:
692            self.finish_request(request, client_address)
693        except Exception:
694            self.handle_error(request, client_address)
695        finally:
696            self.shutdown_request(request)
697
698    def process_request(self, request, client_address):
699        """Start a new thread to process the request."""
700        if self.block_on_close:
701            vars(self).setdefault('_threads', _Threads())
702        t = threading.Thread(target = self.process_request_thread,
703                             args = (request, client_address))
704        t.daemon = self.daemon_threads
705        self._threads.append(t)
706        t.start()
707
708    def server_close(self):
709        super().server_close()
710        self._threads.join()
711
712
713if hasattr(os, "fork"):
714    class ForkingUDPServer(ForkingMixIn, UDPServer): pass
715    class ForkingTCPServer(ForkingMixIn, TCPServer): pass
716
717class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
718class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
719
720if hasattr(socket, 'AF_UNIX'):
721
722    class UnixStreamServer(TCPServer):
723        address_family = socket.AF_UNIX
724
725    class UnixDatagramServer(UDPServer):
726        address_family = socket.AF_UNIX
727
728    class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass
729
730    class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass
731
732    if hasattr(os, "fork"):
733        class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass
734
735        class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass
736
737class BaseRequestHandler:
738
739    """Base class for request handler classes.
740
741    This class is instantiated for each request to be handled.  The
742    constructor sets the instance variables request, client_address
743    and server, and then calls the handle() method.  To implement a
744    specific service, all you need to do is to derive a class which
745    defines a handle() method.
746
747    The handle() method can find the request as self.request, the
748    client address as self.client_address, and the server (in case it
749    needs access to per-server information) as self.server.  Since a
750    separate instance is created for each request, the handle() method
751    can define other arbitrary instance variables.
752
753    """
754
755    def __init__(self, request, client_address, server):
756        self.request = request
757        self.client_address = client_address
758        self.server = server
759        self.setup()
760        try:
761            self.handle()
762        finally:
763            self.finish()
764
765    def setup(self):
766        pass
767
768    def handle(self):
769        pass
770
771    def finish(self):
772        pass
773
774
775# The following two classes make it possible to use the same service
776# class for stream or datagram servers.
777# Each class sets up these instance variables:
778# - rfile: a file object from which receives the request is read
779# - wfile: a file object to which the reply is written
780# When the handle() method returns, wfile is flushed properly
781
782
783class StreamRequestHandler(BaseRequestHandler):
784
785    """Define self.rfile and self.wfile for stream sockets."""
786
787    # Default buffer sizes for rfile, wfile.
788    # We default rfile to buffered because otherwise it could be
789    # really slow for large data (a getc() call per byte); we make
790    # wfile unbuffered because (a) often after a write() we want to
791    # read and we need to flush the line; (b) big writes to unbuffered
792    # files are typically optimized by stdio even when big reads
793    # aren't.
794    rbufsize = -1
795    wbufsize = 0
796
797    # A timeout to apply to the request socket, if not None.
798    timeout = None
799
800    # Disable nagle algorithm for this socket, if True.
801    # Use only when wbufsize != 0, to avoid small packets.
802    disable_nagle_algorithm = False
803
804    def setup(self):
805        self.connection = self.request
806        if self.timeout is not None:
807            self.connection.settimeout(self.timeout)
808        if self.disable_nagle_algorithm:
809            self.connection.setsockopt(socket.IPPROTO_TCP,
810                                       socket.TCP_NODELAY, True)
811        self.rfile = self.connection.makefile('rb', self.rbufsize)
812        if self.wbufsize == 0:
813            self.wfile = _SocketWriter(self.connection)
814        else:
815            self.wfile = self.connection.makefile('wb', self.wbufsize)
816
817    def finish(self):
818        if not self.wfile.closed:
819            try:
820                self.wfile.flush()
821            except socket.error:
822                # A final socket error may have occurred here, such as
823                # the local error ECONNABORTED.
824                pass
825        self.wfile.close()
826        self.rfile.close()
827
828class _SocketWriter(BufferedIOBase):
829    """Simple writable BufferedIOBase implementation for a socket
830
831    Does not hold data in a buffer, avoiding any need to call flush()."""
832
833    def __init__(self, sock):
834        self._sock = sock
835
836    def writable(self):
837        return True
838
839    def write(self, b):
840        self._sock.sendall(b)
841        with memoryview(b) as view:
842            return view.nbytes
843
844    def fileno(self):
845        return self._sock.fileno()
846
847class DatagramRequestHandler(BaseRequestHandler):
848
849    """Define self.rfile and self.wfile for datagram sockets."""
850
851    def setup(self):
852        from io import BytesIO
853        self.packet, self.socket = self.request
854        self.rfile = BytesIO(self.packet)
855        self.wfile = BytesIO()
856
857    def finish(self):
858        self.socket.sendto(self.wfile.getvalue(), self.client_address)
859