• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- Mode: Python -*-
2#   Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3#   Author: Sam Rushing <rushing@nightmare.com>
4
5# ======================================================================
6# Copyright 1996 by Sam Rushing
7#
8#                         All Rights Reserved
9#
10# Permission to use, copy, modify, and distribute this software and
11# its documentation for any purpose and without fee is hereby
12# granted, provided that the above copyright notice appear in all
13# copies and that both that copyright notice and this permission
14# notice appear in supporting documentation, and that the name of Sam
15# Rushing not be used in advertising or publicity pertaining to
16# distribution of the software without specific, written prior
17# permission.
18#
19# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26# ======================================================================
27
28"""Basic infrastructure for asynchronous socket service clients and servers.
29
30There are only two ways to have a program on a single processor do "more
31than one thing at a time".  Multi-threaded programming is the simplest and
32most popular way to do it, but there is another very different technique,
33that lets you have nearly all the advantages of multi-threading, without
34actually using multiple threads. it's really only practical if your program
35is largely I/O bound. If your program is CPU bound, then pre-emptive
36scheduled threads are probably what you really need. Network servers are
37rarely CPU-bound, however.
38
39If your operating system supports the select() system call in its I/O
40library (and nearly all do), then you can use it to juggle multiple
41communication channels at once; doing other work while your I/O is taking
42place in the "background."  Although this strategy can seem strange and
43complex, especially at first, it is in many ways easier to understand and
44control than multi-threaded programming. The module documented here solves
45many of the difficult problems for you, making the task of building
46sophisticated high-performance network servers and clients a snap.
47"""
48
49import select
50import socket
51import sys
52import time
53import warnings
54
55import os
56from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
57     ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
58     errorcode
59
60warnings.warn(
61    'The asyncore module is deprecated. '
62    'The recommended replacement is asyncio',
63    DeprecationWarning,
64    stacklevel=2)
65
66
67_DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
68                           EBADF})
69
70try:
71    socket_map
72except NameError:
73    socket_map = {}
74
75def _strerror(err):
76    try:
77        return os.strerror(err)
78    except (ValueError, OverflowError, NameError):
79        if err in errorcode:
80            return errorcode[err]
81        return "Unknown error %s" %err
82
83class ExitNow(Exception):
84    pass
85
86_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
87
88def read(obj):
89    try:
90        obj.handle_read_event()
91    except _reraised_exceptions:
92        raise
93    except:
94        obj.handle_error()
95
96def write(obj):
97    try:
98        obj.handle_write_event()
99    except _reraised_exceptions:
100        raise
101    except:
102        obj.handle_error()
103
104def _exception(obj):
105    try:
106        obj.handle_expt_event()
107    except _reraised_exceptions:
108        raise
109    except:
110        obj.handle_error()
111
112def readwrite(obj, flags):
113    try:
114        if flags & select.POLLIN:
115            obj.handle_read_event()
116        if flags & select.POLLOUT:
117            obj.handle_write_event()
118        if flags & select.POLLPRI:
119            obj.handle_expt_event()
120        if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
121            obj.handle_close()
122    except OSError as e:
123        if e.errno not in _DISCONNECTED:
124            obj.handle_error()
125        else:
126            obj.handle_close()
127    except _reraised_exceptions:
128        raise
129    except:
130        obj.handle_error()
131
132def poll(timeout=0.0, map=None):
133    if map is None:
134        map = socket_map
135    if map:
136        r = []; w = []; e = []
137        for fd, obj in list(map.items()):
138            is_r = obj.readable()
139            is_w = obj.writable()
140            if is_r:
141                r.append(fd)
142            # accepting sockets should not be writable
143            if is_w and not obj.accepting:
144                w.append(fd)
145            if is_r or is_w:
146                e.append(fd)
147        if [] == r == w == e:
148            time.sleep(timeout)
149            return
150
151        r, w, e = select.select(r, w, e, timeout)
152
153        for fd in r:
154            obj = map.get(fd)
155            if obj is None:
156                continue
157            read(obj)
158
159        for fd in w:
160            obj = map.get(fd)
161            if obj is None:
162                continue
163            write(obj)
164
165        for fd in e:
166            obj = map.get(fd)
167            if obj is None:
168                continue
169            _exception(obj)
170
171def poll2(timeout=0.0, map=None):
172    # Use the poll() support added to the select module in Python 2.0
173    if map is None:
174        map = socket_map
175    if timeout is not None:
176        # timeout is in milliseconds
177        timeout = int(timeout*1000)
178    pollster = select.poll()
179    if map:
180        for fd, obj in list(map.items()):
181            flags = 0
182            if obj.readable():
183                flags |= select.POLLIN | select.POLLPRI
184            # accepting sockets should not be writable
185            if obj.writable() and not obj.accepting:
186                flags |= select.POLLOUT
187            if flags:
188                pollster.register(fd, flags)
189
190        r = pollster.poll(timeout)
191        for fd, flags in r:
192            obj = map.get(fd)
193            if obj is None:
194                continue
195            readwrite(obj, flags)
196
197poll3 = poll2                           # Alias for backward compatibility
198
199def loop(timeout=30.0, use_poll=False, map=None, count=None):
200    if map is None:
201        map = socket_map
202
203    if use_poll and hasattr(select, 'poll'):
204        poll_fun = poll2
205    else:
206        poll_fun = poll
207
208    if count is None:
209        while map:
210            poll_fun(timeout, map)
211
212    else:
213        while map and count > 0:
214            poll_fun(timeout, map)
215            count = count - 1
216
217class dispatcher:
218
219    debug = False
220    connected = False
221    accepting = False
222    connecting = False
223    closing = False
224    addr = None
225    ignore_log_types = frozenset({'warning'})
226
227    def __init__(self, sock=None, map=None):
228        if map is None:
229            self._map = socket_map
230        else:
231            self._map = map
232
233        self._fileno = None
234
235        if sock:
236            # Set to nonblocking just to make sure for cases where we
237            # get a socket from a blocking source.
238            sock.setblocking(False)
239            self.set_socket(sock, map)
240            self.connected = True
241            # The constructor no longer requires that the socket
242            # passed be connected.
243            try:
244                self.addr = sock.getpeername()
245            except OSError as err:
246                if err.errno in (ENOTCONN, EINVAL):
247                    # To handle the case where we got an unconnected
248                    # socket.
249                    self.connected = False
250                else:
251                    # The socket is broken in some unknown way, alert
252                    # the user and remove it from the map (to prevent
253                    # polling of broken sockets).
254                    self.del_channel(map)
255                    raise
256        else:
257            self.socket = None
258
259    def __repr__(self):
260        status = [self.__class__.__module__+"."+self.__class__.__qualname__]
261        if self.accepting and self.addr:
262            status.append('listening')
263        elif self.connected:
264            status.append('connected')
265        if self.addr is not None:
266            try:
267                status.append('%s:%d' % self.addr)
268            except TypeError:
269                status.append(repr(self.addr))
270        return '<%s at %#x>' % (' '.join(status), id(self))
271
272    def add_channel(self, map=None):
273        #self.log_info('adding channel %s' % self)
274        if map is None:
275            map = self._map
276        map[self._fileno] = self
277
278    def del_channel(self, map=None):
279        fd = self._fileno
280        if map is None:
281            map = self._map
282        if fd in map:
283            #self.log_info('closing channel %d:%s' % (fd, self))
284            del map[fd]
285        self._fileno = None
286
287    def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM):
288        self.family_and_type = family, type
289        sock = socket.socket(family, type)
290        sock.setblocking(False)
291        self.set_socket(sock)
292
293    def set_socket(self, sock, map=None):
294        self.socket = sock
295        self._fileno = sock.fileno()
296        self.add_channel(map)
297
298    def set_reuse_addr(self):
299        # try to re-use a server port if possible
300        try:
301            self.socket.setsockopt(
302                socket.SOL_SOCKET, socket.SO_REUSEADDR,
303                self.socket.getsockopt(socket.SOL_SOCKET,
304                                       socket.SO_REUSEADDR) | 1
305                )
306        except OSError:
307            pass
308
309    # ==================================================
310    # predicates for select()
311    # these are used as filters for the lists of sockets
312    # to pass to select().
313    # ==================================================
314
315    def readable(self):
316        return True
317
318    def writable(self):
319        return True
320
321    # ==================================================
322    # socket object methods.
323    # ==================================================
324
325    def listen(self, num):
326        self.accepting = True
327        if os.name == 'nt' and num > 5:
328            num = 5
329        return self.socket.listen(num)
330
331    def bind(self, addr):
332        self.addr = addr
333        return self.socket.bind(addr)
334
335    def connect(self, address):
336        self.connected = False
337        self.connecting = True
338        err = self.socket.connect_ex(address)
339        if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
340        or err == EINVAL and os.name == 'nt':
341            self.addr = address
342            return
343        if err in (0, EISCONN):
344            self.addr = address
345            self.handle_connect_event()
346        else:
347            raise OSError(err, errorcode[err])
348
349    def accept(self):
350        # XXX can return either an address pair or None
351        try:
352            conn, addr = self.socket.accept()
353        except TypeError:
354            return None
355        except OSError as why:
356            if why.errno in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
357                return None
358            else:
359                raise
360        else:
361            return conn, addr
362
363    def send(self, data):
364        try:
365            result = self.socket.send(data)
366            return result
367        except OSError as why:
368            if why.errno == EWOULDBLOCK:
369                return 0
370            elif why.errno in _DISCONNECTED:
371                self.handle_close()
372                return 0
373            else:
374                raise
375
376    def recv(self, buffer_size):
377        try:
378            data = self.socket.recv(buffer_size)
379            if not data:
380                # a closed connection is indicated by signaling
381                # a read condition, and having recv() return 0.
382                self.handle_close()
383                return b''
384            else:
385                return data
386        except OSError as why:
387            # winsock sometimes raises ENOTCONN
388            if why.errno in _DISCONNECTED:
389                self.handle_close()
390                return b''
391            else:
392                raise
393
394    def close(self):
395        self.connected = False
396        self.accepting = False
397        self.connecting = False
398        self.del_channel()
399        if self.socket is not None:
400            try:
401                self.socket.close()
402            except OSError as why:
403                if why.errno not in (ENOTCONN, EBADF):
404                    raise
405
406    # log and log_info may be overridden to provide more sophisticated
407    # logging and warning methods. In general, log is for 'hit' logging
408    # and 'log_info' is for informational, warning and error logging.
409
410    def log(self, message):
411        sys.stderr.write('log: %s\n' % str(message))
412
413    def log_info(self, message, type='info'):
414        if type not in self.ignore_log_types:
415            print('%s: %s' % (type, message))
416
417    def handle_read_event(self):
418        if self.accepting:
419            # accepting sockets are never connected, they "spawn" new
420            # sockets that are connected
421            self.handle_accept()
422        elif not self.connected:
423            if self.connecting:
424                self.handle_connect_event()
425            self.handle_read()
426        else:
427            self.handle_read()
428
429    def handle_connect_event(self):
430        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
431        if err != 0:
432            raise OSError(err, _strerror(err))
433        self.handle_connect()
434        self.connected = True
435        self.connecting = False
436
437    def handle_write_event(self):
438        if self.accepting:
439            # Accepting sockets shouldn't get a write event.
440            # We will pretend it didn't happen.
441            return
442
443        if not self.connected:
444            if self.connecting:
445                self.handle_connect_event()
446        self.handle_write()
447
448    def handle_expt_event(self):
449        # handle_expt_event() is called if there might be an error on the
450        # socket, or if there is OOB data
451        # check for the error condition first
452        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
453        if err != 0:
454            # we can get here when select.select() says that there is an
455            # exceptional condition on the socket
456            # since there is an error, we'll go ahead and close the socket
457            # like we would in a subclassed handle_read() that received no
458            # data
459            self.handle_close()
460        else:
461            self.handle_expt()
462
463    def handle_error(self):
464        nil, t, v, tbinfo = compact_traceback()
465
466        # sometimes a user repr method will crash.
467        try:
468            self_repr = repr(self)
469        except:
470            self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
471
472        self.log_info(
473            'uncaptured python exception, closing channel %s (%s:%s %s)' % (
474                self_repr,
475                t,
476                v,
477                tbinfo
478                ),
479            'error'
480            )
481        self.handle_close()
482
483    def handle_expt(self):
484        self.log_info('unhandled incoming priority event', 'warning')
485
486    def handle_read(self):
487        self.log_info('unhandled read event', 'warning')
488
489    def handle_write(self):
490        self.log_info('unhandled write event', 'warning')
491
492    def handle_connect(self):
493        self.log_info('unhandled connect event', 'warning')
494
495    def handle_accept(self):
496        pair = self.accept()
497        if pair is not None:
498            self.handle_accepted(*pair)
499
500    def handle_accepted(self, sock, addr):
501        sock.close()
502        self.log_info('unhandled accepted event', 'warning')
503
504    def handle_close(self):
505        self.log_info('unhandled close event', 'warning')
506        self.close()
507
508# ---------------------------------------------------------------------------
509# adds simple buffered output capability, useful for simple clients.
510# [for more sophisticated usage use asynchat.async_chat]
511# ---------------------------------------------------------------------------
512
513class dispatcher_with_send(dispatcher):
514
515    def __init__(self, sock=None, map=None):
516        dispatcher.__init__(self, sock, map)
517        self.out_buffer = b''
518
519    def initiate_send(self):
520        num_sent = 0
521        num_sent = dispatcher.send(self, self.out_buffer[:65536])
522        self.out_buffer = self.out_buffer[num_sent:]
523
524    def handle_write(self):
525        self.initiate_send()
526
527    def writable(self):
528        return (not self.connected) or len(self.out_buffer)
529
530    def send(self, data):
531        if self.debug:
532            self.log_info('sending %s' % repr(data))
533        self.out_buffer = self.out_buffer + data
534        self.initiate_send()
535
536# ---------------------------------------------------------------------------
537# used for debugging.
538# ---------------------------------------------------------------------------
539
540def compact_traceback():
541    t, v, tb = sys.exc_info()
542    tbinfo = []
543    if not tb: # Must have a traceback
544        raise AssertionError("traceback does not exist")
545    while tb:
546        tbinfo.append((
547            tb.tb_frame.f_code.co_filename,
548            tb.tb_frame.f_code.co_name,
549            str(tb.tb_lineno)
550            ))
551        tb = tb.tb_next
552
553    # just to be safe
554    del tb
555
556    file, function, line = tbinfo[-1]
557    info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
558    return (file, function, line), t, v, info
559
560def close_all(map=None, ignore_all=False):
561    if map is None:
562        map = socket_map
563    for x in list(map.values()):
564        try:
565            x.close()
566        except OSError as x:
567            if x.errno == EBADF:
568                pass
569            elif not ignore_all:
570                raise
571        except _reraised_exceptions:
572            raise
573        except:
574            if not ignore_all:
575                raise
576    map.clear()
577
578# Asynchronous File I/O:
579#
580# After a little research (reading man pages on various unixen, and
581# digging through the linux kernel), I've determined that select()
582# isn't meant for doing asynchronous file i/o.
583# Heartening, though - reading linux/mm/filemap.c shows that linux
584# supports asynchronous read-ahead.  So _MOST_ of the time, the data
585# will be sitting in memory for us already when we go to read it.
586#
587# What other OS's (besides NT) support async file i/o?  [VMS?]
588#
589# Regardless, this is useful for pipes, and stdin/stdout...
590
591if os.name == 'posix':
592    class file_wrapper:
593        # Here we override just enough to make a file
594        # look like a socket for the purposes of asyncore.
595        # The passed fd is automatically os.dup()'d
596
597        def __init__(self, fd):
598            self.fd = os.dup(fd)
599
600        def __del__(self):
601            if self.fd >= 0:
602                warnings.warn("unclosed file %r" % self, ResourceWarning,
603                              source=self)
604            self.close()
605
606        def recv(self, *args):
607            return os.read(self.fd, *args)
608
609        def send(self, *args):
610            return os.write(self.fd, *args)
611
612        def getsockopt(self, level, optname, buflen=None):
613            if (level == socket.SOL_SOCKET and
614                optname == socket.SO_ERROR and
615                not buflen):
616                return 0
617            raise NotImplementedError("Only asyncore specific behaviour "
618                                      "implemented.")
619
620        read = recv
621        write = send
622
623        def close(self):
624            if self.fd < 0:
625                return
626            fd = self.fd
627            self.fd = -1
628            os.close(fd)
629
630        def fileno(self):
631            return self.fd
632
633    class file_dispatcher(dispatcher):
634
635        def __init__(self, fd, map=None):
636            dispatcher.__init__(self, None, map)
637            self.connected = True
638            try:
639                fd = fd.fileno()
640            except AttributeError:
641                pass
642            self.set_file(fd)
643            # set it to non-blocking mode
644            os.set_blocking(fd, False)
645
646        def set_file(self, fd):
647            self.socket = file_wrapper(fd)
648            self._fileno = self.socket.fileno()
649            self.add_channel()
650