• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# TODO: This module was deprecated and removed from CPython 3.12
2# Now it is a test-only helper. Any attempts to rewrite exising tests that
3# are using this module and remove it completely are appreciated!
4# See: https://github.com/python/cpython/issues/72719
5
6# -*- Mode: Python -*-
7#   Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
8#   Author: Sam Rushing <rushing@nightmare.com>
9
10# ======================================================================
11# Copyright 1996 by Sam Rushing
12#
13#                         All Rights Reserved
14#
15# Permission to use, copy, modify, and distribute this software and
16# its documentation for any purpose and without fee is hereby
17# granted, provided that the above copyright notice appear in all
18# copies and that both that copyright notice and this permission
19# notice appear in supporting documentation, and that the name of Sam
20# Rushing not be used in advertising or publicity pertaining to
21# distribution of the software without specific, written prior
22# permission.
23#
24# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
25# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
26# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
27# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
28# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
29# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
30# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
31# ======================================================================
32
33"""Basic infrastructure for asynchronous socket service clients and servers.
34
35There are only two ways to have a program on a single processor do "more
36than one thing at a time".  Multi-threaded programming is the simplest and
37most popular way to do it, but there is another very different technique,
38that lets you have nearly all the advantages of multi-threading, without
39actually using multiple threads. it's really only practical if your program
40is largely I/O bound. If your program is CPU bound, then pre-emptive
41scheduled threads are probably what you really need. Network servers are
42rarely CPU-bound, however.
43
44If your operating system supports the select() system call in its I/O
45library (and nearly all do), then you can use it to juggle multiple
46communication channels at once; doing other work while your I/O is taking
47place in the "background."  Although this strategy can seem strange and
48complex, especially at first, it is in many ways easier to understand and
49control than multi-threaded programming. The module documented here solves
50many of the difficult problems for you, making the task of building
51sophisticated high-performance network servers and clients a snap.
52"""
53
54import select
55import socket
56import sys
57import time
58import warnings
59
60import os
61from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
62     ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
63     errorcode
64
65
66_DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
67                           EBADF})
68
69try:
70    socket_map
71except NameError:
72    socket_map = {}
73
74def _strerror(err):
75    try:
76        return os.strerror(err)
77    except (ValueError, OverflowError, NameError):
78        if err in errorcode:
79            return errorcode[err]
80        return "Unknown error %s" %err
81
82class ExitNow(Exception):
83    pass
84
85_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
86
87def read(obj):
88    try:
89        obj.handle_read_event()
90    except _reraised_exceptions:
91        raise
92    except:
93        obj.handle_error()
94
95def write(obj):
96    try:
97        obj.handle_write_event()
98    except _reraised_exceptions:
99        raise
100    except:
101        obj.handle_error()
102
103def _exception(obj):
104    try:
105        obj.handle_expt_event()
106    except _reraised_exceptions:
107        raise
108    except:
109        obj.handle_error()
110
111def readwrite(obj, flags):
112    try:
113        if flags & select.POLLIN:
114            obj.handle_read_event()
115        if flags & select.POLLOUT:
116            obj.handle_write_event()
117        if flags & select.POLLPRI:
118            obj.handle_expt_event()
119        if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
120            obj.handle_close()
121    except OSError as e:
122        if e.errno not in _DISCONNECTED:
123            obj.handle_error()
124        else:
125            obj.handle_close()
126    except _reraised_exceptions:
127        raise
128    except:
129        obj.handle_error()
130
131def poll(timeout=0.0, map=None):
132    if map is None:
133        map = socket_map
134    if map:
135        r = []; w = []; e = []
136        for fd, obj in list(map.items()):
137            is_r = obj.readable()
138            is_w = obj.writable()
139            if is_r:
140                r.append(fd)
141            # accepting sockets should not be writable
142            if is_w and not obj.accepting:
143                w.append(fd)
144            if is_r or is_w:
145                e.append(fd)
146        if [] == r == w == e:
147            time.sleep(timeout)
148            return
149
150        r, w, e = select.select(r, w, e, timeout)
151
152        for fd in r:
153            obj = map.get(fd)
154            if obj is None:
155                continue
156            read(obj)
157
158        for fd in w:
159            obj = map.get(fd)
160            if obj is None:
161                continue
162            write(obj)
163
164        for fd in e:
165            obj = map.get(fd)
166            if obj is None:
167                continue
168            _exception(obj)
169
170def poll2(timeout=0.0, map=None):
171    # Use the poll() support added to the select module in Python 2.0
172    if map is None:
173        map = socket_map
174    if timeout is not None:
175        # timeout is in milliseconds
176        timeout = int(timeout*1000)
177    pollster = select.poll()
178    if map:
179        for fd, obj in list(map.items()):
180            flags = 0
181            if obj.readable():
182                flags |= select.POLLIN | select.POLLPRI
183            # accepting sockets should not be writable
184            if obj.writable() and not obj.accepting:
185                flags |= select.POLLOUT
186            if flags:
187                pollster.register(fd, flags)
188
189        r = pollster.poll(timeout)
190        for fd, flags in r:
191            obj = map.get(fd)
192            if obj is None:
193                continue
194            readwrite(obj, flags)
195
196poll3 = poll2                           # Alias for backward compatibility
197
198def loop(timeout=30.0, use_poll=False, map=None, count=None):
199    if map is None:
200        map = socket_map
201
202    if use_poll and hasattr(select, 'poll'):
203        poll_fun = poll2
204    else:
205        poll_fun = poll
206
207    if count is None:
208        while map:
209            poll_fun(timeout, map)
210
211    else:
212        while map and count > 0:
213            poll_fun(timeout, map)
214            count = count - 1
215
216class dispatcher:
217
218    debug = False
219    connected = False
220    accepting = False
221    connecting = False
222    closing = False
223    addr = None
224    ignore_log_types = frozenset({'warning'})
225
226    def __init__(self, sock=None, map=None):
227        if map is None:
228            self._map = socket_map
229        else:
230            self._map = map
231
232        self._fileno = None
233
234        if sock:
235            # Set to nonblocking just to make sure for cases where we
236            # get a socket from a blocking source.
237            sock.setblocking(False)
238            self.set_socket(sock, map)
239            self.connected = True
240            # The constructor no longer requires that the socket
241            # passed be connected.
242            try:
243                self.addr = sock.getpeername()
244            except OSError as err:
245                if err.errno in (ENOTCONN, EINVAL):
246                    # To handle the case where we got an unconnected
247                    # socket.
248                    self.connected = False
249                else:
250                    # The socket is broken in some unknown way, alert
251                    # the user and remove it from the map (to prevent
252                    # polling of broken sockets).
253                    self.del_channel(map)
254                    raise
255        else:
256            self.socket = None
257
258    def __repr__(self):
259        status = [self.__class__.__module__+"."+self.__class__.__qualname__]
260        if self.accepting and self.addr:
261            status.append('listening')
262        elif self.connected:
263            status.append('connected')
264        if self.addr is not None:
265            try:
266                status.append('%s:%d' % self.addr)
267            except TypeError:
268                status.append(repr(self.addr))
269        return '<%s at %#x>' % (' '.join(status), id(self))
270
271    def add_channel(self, map=None):
272        #self.log_info('adding channel %s' % self)
273        if map is None:
274            map = self._map
275        map[self._fileno] = self
276
277    def del_channel(self, map=None):
278        fd = self._fileno
279        if map is None:
280            map = self._map
281        if fd in map:
282            #self.log_info('closing channel %d:%s' % (fd, self))
283            del map[fd]
284        self._fileno = None
285
286    def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM):
287        self.family_and_type = family, type
288        sock = socket.socket(family, type)
289        sock.setblocking(False)
290        self.set_socket(sock)
291
292    def set_socket(self, sock, map=None):
293        self.socket = sock
294        self._fileno = sock.fileno()
295        self.add_channel(map)
296
297    def set_reuse_addr(self):
298        # try to re-use a server port if possible
299        try:
300            self.socket.setsockopt(
301                socket.SOL_SOCKET, socket.SO_REUSEADDR,
302                self.socket.getsockopt(socket.SOL_SOCKET,
303                                       socket.SO_REUSEADDR) | 1
304                )
305        except OSError:
306            pass
307
308    # ==================================================
309    # predicates for select()
310    # these are used as filters for the lists of sockets
311    # to pass to select().
312    # ==================================================
313
314    def readable(self):
315        return True
316
317    def writable(self):
318        return True
319
320    # ==================================================
321    # socket object methods.
322    # ==================================================
323
324    def listen(self, num):
325        self.accepting = True
326        if os.name == 'nt' and num > 5:
327            num = 5
328        return self.socket.listen(num)
329
330    def bind(self, addr):
331        self.addr = addr
332        return self.socket.bind(addr)
333
334    def connect(self, address):
335        self.connected = False
336        self.connecting = True
337        err = self.socket.connect_ex(address)
338        if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
339        or err == EINVAL and os.name == 'nt':
340            self.addr = address
341            return
342        if err in (0, EISCONN):
343            self.addr = address
344            self.handle_connect_event()
345        else:
346            raise OSError(err, errorcode[err])
347
348    def accept(self):
349        # XXX can return either an address pair or None
350        try:
351            conn, addr = self.socket.accept()
352        except TypeError:
353            return None
354        except OSError as why:
355            if why.errno in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
356                return None
357            else:
358                raise
359        else:
360            return conn, addr
361
362    def send(self, data):
363        try:
364            result = self.socket.send(data)
365            return result
366        except OSError as why:
367            if why.errno == EWOULDBLOCK:
368                return 0
369            elif why.errno in _DISCONNECTED:
370                self.handle_close()
371                return 0
372            else:
373                raise
374
375    def recv(self, buffer_size):
376        try:
377            data = self.socket.recv(buffer_size)
378            if not data:
379                # a closed connection is indicated by signaling
380                # a read condition, and having recv() return 0.
381                self.handle_close()
382                return b''
383            else:
384                return data
385        except OSError as why:
386            # winsock sometimes raises ENOTCONN
387            if why.errno in _DISCONNECTED:
388                self.handle_close()
389                return b''
390            else:
391                raise
392
393    def close(self):
394        self.connected = False
395        self.accepting = False
396        self.connecting = False
397        self.del_channel()
398        if self.socket is not None:
399            try:
400                self.socket.close()
401            except OSError as why:
402                if why.errno not in (ENOTCONN, EBADF):
403                    raise
404
405    # log and log_info may be overridden to provide more sophisticated
406    # logging and warning methods. In general, log is for 'hit' logging
407    # and 'log_info' is for informational, warning and error logging.
408
409    def log(self, message):
410        sys.stderr.write('log: %s\n' % str(message))
411
412    def log_info(self, message, type='info'):
413        if type not in self.ignore_log_types:
414            print('%s: %s' % (type, message))
415
416    def handle_read_event(self):
417        if self.accepting:
418            # accepting sockets are never connected, they "spawn" new
419            # sockets that are connected
420            self.handle_accept()
421        elif not self.connected:
422            if self.connecting:
423                self.handle_connect_event()
424            self.handle_read()
425        else:
426            self.handle_read()
427
428    def handle_connect_event(self):
429        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
430        if err != 0:
431            raise OSError(err, _strerror(err))
432        self.handle_connect()
433        self.connected = True
434        self.connecting = False
435
436    def handle_write_event(self):
437        if self.accepting:
438            # Accepting sockets shouldn't get a write event.
439            # We will pretend it didn't happen.
440            return
441
442        if not self.connected:
443            if self.connecting:
444                self.handle_connect_event()
445        self.handle_write()
446
447    def handle_expt_event(self):
448        # handle_expt_event() is called if there might be an error on the
449        # socket, or if there is OOB data
450        # check for the error condition first
451        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
452        if err != 0:
453            # we can get here when select.select() says that there is an
454            # exceptional condition on the socket
455            # since there is an error, we'll go ahead and close the socket
456            # like we would in a subclassed handle_read() that received no
457            # data
458            self.handle_close()
459        else:
460            self.handle_expt()
461
462    def handle_error(self):
463        nil, t, v, tbinfo = compact_traceback()
464
465        # sometimes a user repr method will crash.
466        try:
467            self_repr = repr(self)
468        except:
469            self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
470
471        self.log_info(
472            'uncaptured python exception, closing channel %s (%s:%s %s)' % (
473                self_repr,
474                t,
475                v,
476                tbinfo
477                ),
478            'error'
479            )
480        self.handle_close()
481
482    def handle_expt(self):
483        self.log_info('unhandled incoming priority event', 'warning')
484
485    def handle_read(self):
486        self.log_info('unhandled read event', 'warning')
487
488    def handle_write(self):
489        self.log_info('unhandled write event', 'warning')
490
491    def handle_connect(self):
492        self.log_info('unhandled connect event', 'warning')
493
494    def handle_accept(self):
495        pair = self.accept()
496        if pair is not None:
497            self.handle_accepted(*pair)
498
499    def handle_accepted(self, sock, addr):
500        sock.close()
501        self.log_info('unhandled accepted event', 'warning')
502
503    def handle_close(self):
504        self.log_info('unhandled close event', 'warning')
505        self.close()
506
507# ---------------------------------------------------------------------------
508# adds simple buffered output capability, useful for simple clients.
509# [for more sophisticated usage use asynchat.async_chat]
510# ---------------------------------------------------------------------------
511
512class dispatcher_with_send(dispatcher):
513
514    def __init__(self, sock=None, map=None):
515        dispatcher.__init__(self, sock, map)
516        self.out_buffer = b''
517
518    def initiate_send(self):
519        num_sent = 0
520        num_sent = dispatcher.send(self, self.out_buffer[:65536])
521        self.out_buffer = self.out_buffer[num_sent:]
522
523    def handle_write(self):
524        self.initiate_send()
525
526    def writable(self):
527        return (not self.connected) or len(self.out_buffer)
528
529    def send(self, data):
530        if self.debug:
531            self.log_info('sending %s' % repr(data))
532        self.out_buffer = self.out_buffer + data
533        self.initiate_send()
534
535# ---------------------------------------------------------------------------
536# used for debugging.
537# ---------------------------------------------------------------------------
538
539def compact_traceback():
540    exc = sys.exception()
541    tb = exc.__traceback__
542    if not tb: # Must have a traceback
543        raise AssertionError("traceback does not exist")
544    tbinfo = []
545    while tb:
546        tbinfo.append((
547            tb.tb_frame.f_code.co_filename,
548            tb.tb_frame.f_code.co_name,
549            str(tb.tb_lineno)
550            ))
551        tb = tb.tb_next
552
553    # just to be safe
554    del tb
555
556    file, function, line = tbinfo[-1]
557    info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
558    return (file, function, line), type(exc), exc, info
559
560def close_all(map=None, ignore_all=False):
561    if map is None:
562        map = socket_map
563    for x in list(map.values()):
564        try:
565            x.close()
566        except OSError as x:
567            if x.errno == EBADF:
568                pass
569            elif not ignore_all:
570                raise
571        except _reraised_exceptions:
572            raise
573        except:
574            if not ignore_all:
575                raise
576    map.clear()
577
578# Asynchronous File I/O:
579#
580# After a little research (reading man pages on various unixen, and
581# digging through the linux kernel), I've determined that select()
582# isn't meant for doing asynchronous file i/o.
583# Heartening, though - reading linux/mm/filemap.c shows that linux
584# supports asynchronous read-ahead.  So _MOST_ of the time, the data
585# will be sitting in memory for us already when we go to read it.
586#
587# What other OS's (besides NT) support async file i/o?  [VMS?]
588#
589# Regardless, this is useful for pipes, and stdin/stdout...
590
591if os.name == 'posix':
592    class file_wrapper:
593        # Here we override just enough to make a file
594        # look like a socket for the purposes of asyncore.
595        # The passed fd is automatically os.dup()'d
596
597        def __init__(self, fd):
598            self.fd = os.dup(fd)
599
600        def __del__(self):
601            if self.fd >= 0:
602                warnings.warn("unclosed file %r" % self, ResourceWarning,
603                              source=self)
604            self.close()
605
606        def recv(self, *args):
607            return os.read(self.fd, *args)
608
609        def send(self, *args):
610            return os.write(self.fd, *args)
611
612        def getsockopt(self, level, optname, buflen=None):
613            if (level == socket.SOL_SOCKET and
614                optname == socket.SO_ERROR and
615                not buflen):
616                return 0
617            raise NotImplementedError("Only asyncore specific behaviour "
618                                      "implemented.")
619
620        read = recv
621        write = send
622
623        def close(self):
624            if self.fd < 0:
625                return
626            fd = self.fd
627            self.fd = -1
628            os.close(fd)
629
630        def fileno(self):
631            return self.fd
632
633    class file_dispatcher(dispatcher):
634
635        def __init__(self, fd, map=None):
636            dispatcher.__init__(self, None, map)
637            self.connected = True
638            try:
639                fd = fd.fileno()
640            except AttributeError:
641                pass
642            self.set_file(fd)
643            # set it to non-blocking mode
644            os.set_blocking(fd, False)
645
646        def set_file(self, fd):
647            self.socket = file_wrapper(fd)
648            self._fileno = self.socket.fileno()
649            self.add_channel()
650